This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch jsancio_KAFKA-16535 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 2dbb6d7aa6622fe87b61fbb444fe075085870c3b Author: Colin P. McCabe <[email protected]> AuthorDate: Tue Jul 16 12:16:56 2024 -0700 KAFKA-16535; Implement KRaft add voter handling --- .../common/errors/DuplicateVoterException.java | 30 + .../org/apache/kafka/common/protocol/Errors.java | 4 +- .../kafka/common/requests/ApiVersionsRequest.java | 2 +- .../main/scala/kafka/server/ControllerApis.scala | 2 +- .../main/java/org/apache/kafka/raft/Endpoints.java | 28 + .../org/apache/kafka/raft/KafkaRaftClient.java | 150 ++++- .../java/org/apache/kafka/raft/LeaderState.java | 43 +- .../main/java/org/apache/kafka/raft/RaftUtil.java | 35 ++ .../kafka/raft/internals/AddVoterHandler.java | 379 ++++++++++++ .../kafka/raft/internals/AddVoterHandlerState.java | 92 +++ .../kafka/raft/internals/BatchAccumulator.java | 30 +- .../kafka/raft/internals/DefaultRequestSender.java | 110 ++++ .../internals/KRaftControlRecordStateMachine.java | 17 +- .../apache/kafka/raft/internals/RequestSender.java | 53 ++ .../org/apache/kafka/raft/internals/VoterSet.java | 8 + .../kafka/raft/internals/VoterSetHistory.java | 7 + .../kafka/raft/KafkaRaftClientReconfigTest.java | 663 ++++++++++++++++++++- .../apache/kafka/raft/RaftClientTestContext.java | 80 ++- 18 files changed, 1702 insertions(+), 31 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DuplicateVoterException.java b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateVoterException.java new file mode 100644 index 00000000000..11df6eaced2 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateVoterException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class DuplicateVoterException extends ApiException { + + private static final long serialVersionUID = 1L; + + public DuplicateVoterException(String message) { + super(message); + } + + public DuplicateVoterException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index ded58495e6d..9363d827267 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.errors.DelegationTokenOwnerMismatchException; import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException; import org.apache.kafka.common.errors.DuplicateResourceException; import org.apache.kafka.common.errors.DuplicateSequenceException; +import org.apache.kafka.common.errors.DuplicateVoterException; import org.apache.kafka.common.errors.ElectionNotNeededException; import org.apache.kafka.common.errors.EligibleLeadersNotAvailableException; import org.apache.kafka.common.errors.FeatureUpdateFailedException; @@ -405,7 +406,8 @@ public enum Errors { SHARE_SESSION_NOT_FOUND(122, "The share session was not found.", ShareSessionNotFoundException::new), INVALID_SHARE_SESSION_EPOCH(123, "The share session epoch is invalid.", InvalidShareSessionEpochException::new), FENCED_STATE_EPOCH(124, "The share coordinator rejected the request because the share-group state epoch did not match.", FencedStateEpochException::new), - INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving replica's key.", InvalidVoterKeyException::new); + INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving replica's key.", InvalidVoterKeyException::new), + DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", DuplicateVoterException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java index 6a1165cbecb..c286ceaf2d3 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java @@ -30,7 +30,7 @@ import java.util.regex.Pattern; public class ApiVersionsRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder<ApiVersionsRequest> { - private static final String DEFAULT_CLIENT_SOFTWARE_NAME = "apache-kafka-java"; + public static final String DEFAULT_CLIENT_SOFTWARE_NAME = "apache-kafka-java"; private static final ApiVersionsRequestData DATA = new ApiVersionsRequestData() .setClientSoftwareName(DEFAULT_CLIENT_SOFTWARE_NAME) diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index 59d5d0bef10..1730bd9a4cc 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -1086,7 +1086,7 @@ class ControllerApis( def handleAddRaftVoter(request: RequestChannel.Request): CompletableFuture[Unit] = { authHelper.authorizeClusterOperation(request, ALTER) - throw new UnsupportedVersionException("handleAddRaftVoter is not supported yet.") + handleRaftRequest(request, response => new AddRaftVoterResponse(response.asInstanceOf[AddRaftVoterResponseData])) } def handleRemoveRaftVoter(request: RequestChannel.Request): CompletableFuture[Unit] = { diff --git a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java index 0704e5ddf7b..41292252ae6 100644 --- a/raft/src/main/java/org/apache/kafka/raft/Endpoints.java +++ b/raft/src/main/java/org/apache/kafka/raft/Endpoints.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.common.message.AddRaftVoterRequestData; import org.apache.kafka.common.message.BeginQuorumEpochRequestData; import org.apache.kafka.common.message.BeginQuorumEpochResponseData; import org.apache.kafka.common.message.EndQuorumEpochRequestData; @@ -101,6 +102,21 @@ public final class Endpoints { return leaderEndpoints; } + public AddRaftVoterRequestData.ListenerCollection toAddVoterRequest() { + AddRaftVoterRequestData.ListenerCollection listeners = + new AddRaftVoterRequestData.ListenerCollection(endpoints.size()); + for (Map.Entry<ListenerName, InetSocketAddress> entry : endpoints.entrySet()) { + listeners.add( + new AddRaftVoterRequestData.Listener() + .setName(entry.getKey().value()) + .setHost(entry.getValue().getHostString()) + .setPort(entry.getValue().getPort()) + ); + } + + return listeners; + } + private static final Endpoints EMPTY = new Endpoints(Collections.emptyMap()); public static Endpoints empty() { return EMPTY; @@ -230,4 +246,16 @@ public final class Endpoints { ) .orElse(Endpoints.empty()); } + + public static Endpoints fromAddVoterRequest(AddRaftVoterRequestData.ListenerCollection endpoints) { + Map<ListenerName, InetSocketAddress> listeners = new HashMap<>(endpoints.size()); + for (AddRaftVoterRequestData.Listener endpoint : endpoints) { + listeners.put( + ListenerName.normalised(endpoint.name()), + InetSocketAddress.createUnresolved(endpoint.host(), endpoint.port()) + ); + } + + return new Endpoints(listeners); + } } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 17bf64f4666..a359b75e862 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -24,6 +24,9 @@ import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.NotLeaderOrFollowerException; import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.message.AddRaftVoterRequestData; +import org.apache.kafka.common.message.AddRaftVoterResponseData; +import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.BeginQuorumEpochRequestData; import org.apache.kafka.common.message.BeginQuorumEpochResponseData; import org.apache.kafka.common.message.DescribeQuorumRequestData; @@ -57,10 +60,12 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; import org.apache.kafka.raft.errors.NotLeaderException; +import org.apache.kafka.raft.internals.AddVoterHandler; import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.BatchMemoryPool; import org.apache.kafka.raft.internals.BlockingMessageQueue; import org.apache.kafka.raft.internals.CloseListener; +import org.apache.kafka.raft.internals.DefaultRequestSender; import org.apache.kafka.raft.internals.FuturePurgatory; import org.apache.kafka.raft.internals.KRaftControlRecordStateMachine; import org.apache.kafka.raft.internals.KafkaRaftMetrics; @@ -196,6 +201,9 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { private volatile QuorumState quorum; private volatile RequestManager requestManager; + // Specialized handles + private volatile AddVoterHandler addVoterHandler; + /** * Create a new instance. * @@ -333,6 +341,10 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { logger.debug("Leader high watermark updated to {}", highWatermark); log.updateHighWatermark(highWatermark); + // Notify the add voter handler that the HWM has been updated incase there are add + // voter request that need to be completed + addVoterHandler.highWatermarkUpdated(state); + // After updating the high watermark, we first clear the append // purgatory so that we have an opportunity to route the pending // records still held in memory directly to the listener @@ -351,8 +363,8 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { // Send snapshot to the listener, if the listener is at the beginning of the log and there is a snapshot, // or the listener is trying to read an offset for which there isn't a segment in the log. if (nextExpectedOffset < highWatermark && - ((nextExpectedOffset == 0 && latestSnapshot().isPresent()) || - nextExpectedOffset < log.startOffset()) + ((nextExpectedOffset == ListenerContext.STARTING_NEXT_OFFSET || nextExpectedOffset < log.startOffset()) + && latestSnapshot().isPresent()) ) { SnapshotReader<T> snapshot = latestSnapshot().orElseThrow(() -> new IllegalStateException( String.format( @@ -364,6 +376,20 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { ) )); listenerContext.fireHandleSnapshot(snapshot); + } else if (nextExpectedOffset == ListenerContext.STARTING_NEXT_OFFSET) { + // Reset the next offset to 0 since it is a new listener context and there is no + // bootstraping checkpoint + listenerContext.resetOffsetForEmptyPartition(); + } else if (nextExpectedOffset < log.startOffset()) { + throw new IllegalStateException( + String.format( + "Snapshot expected since next offset of %s is %d, log start offset is %d and high-watermark is %d", + listenerContext.listenerName(), + nextExpectedOffset, + log.startOffset(), + highWatermark + ) + ); } }); @@ -489,6 +515,19 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { if (quorum.isOnlyVoter() && !quorum.isCandidate()) { transitionToCandidate(currentTimeMs); } + + // Specialized handlers + this.addVoterHandler = new AddVoterHandler( + partitionState, + new DefaultRequestSender( + requestManager, + channel, + messageQueue, + logContext + ), + time, + logContext + ); } @Override @@ -1970,6 +2009,80 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { return true; } + private CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest( + RaftRequest.Inbound requestMetadata, + long currentTimeMs + ) { + AddRaftVoterRequestData data = (AddRaftVoterRequestData) requestMetadata.data(); + + if (!hasValidClusterId(data.clusterId())) { + return completedFuture( + new AddRaftVoterResponseData().setErrorCode(Errors.INCONSISTENT_CLUSTER_ID.code()) + ); + } + + Optional<Errors> leaderValidation = validateLeaderOnlyRequest(quorum.epoch()); + if (leaderValidation.isPresent()) { + return completedFuture( + new AddRaftVoterResponseData().setErrorCode(leaderValidation.get().code()) + ); + } + + Optional<ReplicaKey> newVoter = RaftUtil.addVoterRequestVoterKey(data); + if (!newVoter.isPresent()) { + return completedFuture( + new AddRaftVoterResponseData() + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage("Add voter request didn't include a valid voter") + ); + } + + Endpoints newVoterEndpoints = Endpoints.fromAddVoterRequest(data.listeners()); + if (!newVoterEndpoints.address(channel.listenerName()).isPresent()) { + return completedFuture( + new AddRaftVoterResponseData() + .setErrorCode(Errors.INVALID_REQUEST.code()) + .setErrorMessage( + String.format( + "Add voter request didn't include the default listener: %s", + channel.listenerName() + ) + ) + ); + } + + return addVoterHandler.handleAddVoterRequest( + quorum.leaderStateOrThrow(), + newVoter.get(), + newVoterEndpoints, + currentTimeMs + ); + } + + private boolean handleApiVersionsResponse( + RaftResponse.Inbound responseMetadata, + long currentTimeMs + ) { + if (!quorum.isLeader()) { + // Not the leader anymore just ignore the API_VERSIONS response + return true; + } + + ApiVersionsResponseData response = (ApiVersionsResponseData) responseMetadata.data(); + + Errors error = Errors.forCode(response.errorCode()); + Optional<ApiVersionsResponseData.SupportedFeatureKey> supportedKraftVersions = + Optional.ofNullable(response.supportedFeatures().find(KRaftVersion.FEATURE_NAME)); + + return addVoterHandler.handleApiVersionsResponse( + quorum.leaderStateOrThrow(), + responseMetadata.source(), + error, + supportedKraftVersions, + currentTimeMs + ); + } + private boolean hasConsistentLeader(int epoch, OptionalInt leaderId) { // Only elected leaders are sent in the request/response header, so if we have an elected // leaderId, it should be consistent with what is in the message. @@ -2132,6 +2245,10 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { handledSuccessfully = handleFetchSnapshotResponse(response, currentTimeMs); break; + case API_VERSIONS: + handledSuccessfully = handleApiVersionsResponse(response, currentTimeMs); + break; + default: throw new IllegalArgumentException("Received unexpected response type: " + apiKey); } @@ -2224,6 +2341,10 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { responseFuture = completedFuture(handleFetchSnapshotRequest(request, currentTimeMs)); break; + case ADD_RAFT_VOTER: + responseFuture = handleAddVoterRequest(request, currentTimeMs); + break; + default: throw new IllegalArgumentException("Unexpected request type " + apiKey); } @@ -2567,6 +2688,8 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { return 0L; } + long timeUtilAddVoterExpires = addVoterHandler.maybeExpirePendingOperation(state, currentTimeMs); + long timeUntilFlush = maybeAppendBatches( state, currentTimeMs @@ -2577,7 +2700,16 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { currentTimeMs ); - return Math.min(timeUntilFlush, Math.min(timeUntilNextBeginQuorumSend, timeUntilCheckQuorumExpires)); + return Math.min( + timeUntilFlush, + Math.min( + timeUntilNextBeginQuorumSend, + Math.min( + timeUntilCheckQuorumExpires, + timeUtilAddVoterExpires + ) + ) + ); } private long maybeSendVoteRequests( @@ -3154,6 +3286,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { } private final class ListenerContext implements CloseListener<BatchReader<T>> { + private static final long STARTING_NEXT_OFFSET = -1; private final RaftClient.Listener<T> listener; // This field is used only by the Raft IO thread private LeaderAndEpoch lastFiredLeaderChange = LeaderAndEpoch.UNKNOWN; @@ -3161,7 +3294,7 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { // These fields are visible to both the Raft IO thread and the listener // and are protected through synchronization on this ListenerContext instance private BatchReader<T> lastSent = null; - private long nextOffset = 0; + private long nextOffset = STARTING_NEXT_OFFSET; private ListenerContext(Listener<T> listener) { this.listener = listener; @@ -3175,6 +3308,15 @@ public final class KafkaRaftClient<T> implements RaftClient<T> { return nextOffset; } + /** + * Sets the nextOffset to zero. + * + * This is done when the partition is empty. No log and no bootstraping snapshot. + */ + private synchronized void resetOffsetForEmptyPartition() { + nextOffset = 0; + } + /** * Get the next expected offset, which might be larger than the last acked * offset if there are inflight batches which have not been acked yet. diff --git a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java index 9cdfbbf3d84..fc43713a748 100644 --- a/raft/src/main/java/org/apache/kafka/raft/LeaderState.java +++ b/raft/src/main/java/org/apache/kafka/raft/LeaderState.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.raft.internals.AddVoterHandlerState; import org.apache.kafka.raft.internals.BatchAccumulator; import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.VoterSet; @@ -71,6 +72,7 @@ public class LeaderState<T> implements EpochState { private Optional<LogOffsetMetadata> highWatermark = Optional.empty(); private Map<Integer, ReplicaState> voterStates = new HashMap<>(); + private Optional<AddVoterHandlerState> addVoterHandlerState = Optional.empty(); private final Map<ReplicaKey, ReplicaState> observerStates = new HashMap<>(); private final Logger log; private final BatchAccumulator<T> accumulator; @@ -198,7 +200,24 @@ public class LeaderState<T> implements EpochState { } public BatchAccumulator<T> accumulator() { - return this.accumulator; + return accumulator; + } + + public Optional<AddVoterHandlerState> addVoterHandlerState() { + return addVoterHandlerState; + } + + public void resetAddVoterHandlerState( + Errors error, + String message, + Optional<AddVoterHandlerState> state + ) { + addVoterHandlerState.ifPresent( + handlerState -> handlerState + .future() + .complete(RaftUtil.addVoterResponse(error, message)) + ); + addVoterHandlerState = state; } private static List<Voter> convertToVoters(Set<Integer> voterIds) { @@ -273,10 +292,29 @@ public class LeaderState<T> implements EpochState { accumulator.forceDrain(); } + public long appendVotersRecord(VoterSet voters, long currentTimeMs) { + return accumulator.appendVotersRecord( + voters.toVotersRecord(ControlRecordUtils.KRAFT_VOTERS_CURRENT_VERSION), + currentTimeMs + ); + } + public boolean isResignRequested() { return resignRequested; } + public boolean isReplicaCaughtUp(ReplicaKey replicaKey, long currentTimeMs) { + // In summary, let's consider a replica caughed up for add voter, if they + // have fetched within the last hour + return Optional.ofNullable(observerStates.get(replicaKey)) + .map(state -> + state.lastCaughtUpTimestamp > 0 && + state.lastFetchTimestamp > 0 && + state.lastFetchTimestamp > currentTimeMs - 3600000 + ) + .orElse(false); + } + public void requestResign() { this.resignRequested = true; } @@ -510,7 +548,7 @@ public class LeaderState<T> implements EpochState { return state; } - private Optional<ReplicaState> getReplicaState(ReplicaKey replicaKey) { + public Optional<ReplicaState> getReplicaState(ReplicaKey replicaKey) { ReplicaState state = voterStates.get(replicaKey.id()); if (state == null || !state.matchesKey(replicaKey)) { state = observerStates.get(replicaKey); @@ -749,6 +787,7 @@ public class LeaderState<T> implements EpochState { @Override public void close() { + addVoterHandlerState.ifPresent(AddVoterHandlerState::close); accumulator.close(); } } diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java index c28864b2f2b..839f90c0819 100644 --- a/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java +++ b/raft/src/main/java/org/apache/kafka/raft/RaftUtil.java @@ -18,6 +18,8 @@ package org.apache.kafka.raft; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.message.AddRaftVoterRequestData; +import org.apache.kafka.common.message.AddRaftVoterResponseData; import org.apache.kafka.common.message.BeginQuorumEpochRequestData; import org.apache.kafka.common.message.BeginQuorumEpochResponseData; import org.apache.kafka.common.message.DescribeQuorumRequestData; @@ -500,6 +502,31 @@ public class RaftUtil { return response; } + public static AddRaftVoterRequestData addVoterRequest( + String clusterId, + int timeoutMs, + ReplicaKey voter, + Endpoints listeners + ) { + return new AddRaftVoterRequestData() + .setClusterId(clusterId) + .setTimeoutMs(timeoutMs) + .setVoterId(voter.id()) + .setVoterDirectoryId(voter.directoryId().orElse(ReplicaKey.NO_DIRECTORY_ID)) + .setListeners(listeners.toAddVoterRequest()); + } + + public static AddRaftVoterResponseData addVoterResponse( + Errors error, + String errorMessage + ) { + errorMessage = errorMessage == null ? error.message() : errorMessage; + + return new AddRaftVoterResponseData() + .setErrorCode(error.code()) + .setErrorMessage(errorMessage); + } + public static Optional<ReplicaKey> voteRequestVoterKey( VoteRequestData request, VoteRequestData.PartitionData partition @@ -522,6 +549,14 @@ public class RaftUtil { } } + public static Optional<ReplicaKey> addVoterRequestVoterKey(AddRaftVoterRequestData request) { + if (request.voterId() < 0) { + return Optional.empty(); + } else { + return Optional.of(ReplicaKey.of(request.voterId(), request.voterDirectoryId())); + } + } + static boolean hasValidTopicPartition(FetchRequestData data, TopicPartition topicPartition, Uuid topicId) { return data.topics().size() == 1 && data.topics().get(0).topicId().equals(topicId) && diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java new file mode 100644 index 00000000000..d040e87ac03 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandler.java @@ -0,0 +1,379 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.AddRaftVoterResponseData; +import org.apache.kafka.common.message.ApiVersionsRequestData; +import org.apache.kafka.common.message.ApiVersionsResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiVersionsRequest; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.LeaderState; +import org.apache.kafka.raft.LogOffsetMetadata; +import org.apache.kafka.raft.RaftUtil; +import org.apache.kafka.server.common.KRaftVersion; + +import org.slf4j.Logger; + +import java.util.Optional; +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; + +/** + * This type implement the protocol for adding a voter to a KRaft partition. + * + * The general algorithm for adding a voter to the voter set is: + * + * 1. Wait until there are no uncommitted VotersRecord. Note that the implementation may just + * return a REQUEST_TIMED_OUT error if there are pending operations. + * 2. Wait for the LeaderChangeMessage control record from the current epoch to get committed. + * Note that the implementation may just return a REQUEST_TIMED_OUT error if there are pending + * operations. + * 3. Send an ApiVersions RPC to the first listener to discover the supported kraft.version of the + * new voter. + * 4. Check that the new voter supports the current kraft.version. + * 5. Wait for the fetch offset of the replica (ID, UUID) to catch up to the log end offset of + * the leader. + * 6. Append the updated VotersRecord to the log. + * 7. The KRaft internal listener will read this record from the log and add the new voter to the + * set of voters. + * 8. Wait for the VotersRecord to commit using the majority of the new set of voters. Return a + * REQUEST_TIMED_OUT error if it doesn't succeed in time. + * 9. Send the AddVoter response to the client. + * + * The algorithm above could be improved as part of KAFKA-17147. Instead of returning an error + * immediately for 1. and 2., KRaft can wait with a timeout until those invariants become true. + */ +public final class AddVoterHandler { + private final KRaftControlRecordStateMachine partitionState; + private final RequestSender requestSender; + private final Time time; + private final Logger logger; + + public AddVoterHandler( + KRaftControlRecordStateMachine partitionState, + RequestSender requestSender, + Time time, + LogContext logContext + ) { + this.partitionState = partitionState; + this.requestSender = requestSender; + this.time = time; + this.logger = logContext.logger(AddVoterHandler.class); + } + + public CompletableFuture<AddRaftVoterResponseData> handleAddVoterRequest( + LeaderState<?> leaderState, + ReplicaKey voterKey, + Endpoints voterEndpoints, + long currentTimeMs + ) { + // Check if there are any pending add voter requests + if (isOperationPending(leaderState, currentTimeMs)) { + return CompletableFuture.completedFuture( + RaftUtil.addVoterResponse( + Errors.REQUEST_TIMED_OUT, + "Request timed out waiting for leader to handle previous add voter request" + ) + ); + } + + Optional<LogHistory.Entry<VoterSet>> votersEntry = partitionState.lastVoterSetEntry(); + + // Check that the leader has established a HWM and committed the current epoch + Optional<Long> highWatermark = leaderState.highWatermark().map(LogOffsetMetadata::offset); + if (!highWatermark.isPresent()) { + return CompletableFuture.completedFuture( + RaftUtil.addVoterResponse( + Errors.REQUEST_TIMED_OUT, + "Request timed out waiting for leader to establish HWM and fence previous voter changes" + ) + ); + } + + // Check that the cluster supports kraft.version >= 1 + KRaftVersion kraftVersion = partitionState.lastKraftVersion(); + if (!kraftVersion.isReconfigSupported()) { + return CompletableFuture.completedFuture( + RaftUtil.addVoterResponse( + Errors.UNSUPPORTED_VERSION, + String.format( + "Cluster doesn't support adding voter because the %s feature is %s", + kraftVersion.featureName(), + kraftVersion.featureLevel() + ) + ) + ); + } + + // Check that there are no uncommitted VotersRecord + if (!votersEntry.isPresent() || votersEntry.get().offset() >= highWatermark.get()) { + return CompletableFuture.completedFuture( + RaftUtil.addVoterResponse( + Errors.REQUEST_TIMED_OUT, + String.format( + "Request timed out waiting for voters to commit the latest voter change at %s with HWM %d", + votersEntry.map(LogHistory.Entry::offset), + highWatermark.get() + ) + ) + ); + } + + // Check that the new voter id is not part of the current voter set + VoterSet voters = votersEntry.get().value(); + if (voters.voterIds().contains(voterKey.id())) { + return CompletableFuture.completedFuture( + RaftUtil.addVoterResponse( + Errors.DUPLICATE_VOTER, + String.format( + "The voter id for %s is already part of the set of voters %s.", + voterKey, + voters.voterKeys() + ) + ) + ); + } + + // Send API_VERSIONS request to new voter to discover their supported kraft.version range + OptionalLong timeout = requestSender.send( + voterEndpoints + .address(requestSender.listenerName()) + .map(address -> new Node(voterKey.id(), address.getHostName(), address.getPort())) + .orElseThrow( + () -> new IllegalArgumentException( + String.format( + "Provided listeners %s do not contain a listener for %s", + voterEndpoints, + requestSender.listenerName() + ) + ) + ), + this::buildApiVersionsRequest, + currentTimeMs + ); + if (!timeout.isPresent()) { + return CompletableFuture.completedFuture( + RaftUtil.addVoterResponse( + Errors.REQUEST_TIMED_OUT, + String.format("New voter %s is not ready to receive requests", voterKey) + ) + ); + } + + AddVoterHandlerState state = new AddVoterHandlerState( + voterKey, + voterEndpoints, + time.timer(timeout.getAsLong()) + ); + leaderState.resetAddVoterHandlerState( + Errors.UNKNOWN_SERVER_ERROR, + null, + Optional.of(state) + ); + + return state.future(); + } + + public boolean handleApiVersionsResponse( + LeaderState<?> leaderState, + Node source, + Errors error, + Optional<ApiVersionsResponseData.SupportedFeatureKey> supportedKraftVersions, + long currentTimeMs + ) { + Optional<AddVoterHandlerState> handlerState = leaderState.addVoterHandlerState(); + if (!handlerState.isPresent()) { + // There are no pending add operation just ignore the api response + return true; + } + + + // Check that the API_VERSIONS response matches the id of the voter getting added + AddVoterHandlerState current = handlerState.get(); + if (!current.expectingApiResponse(source.id())) { + logger.info( + "API_VERSIONS response is not expected from {}: voterKey is {}, lastOffset is {}", + source, + current.voterKey(), + current.lastOffset() + ); + + return true; + } + + // Abort operation if the API_VERSIONS returned an error + if (error != Errors.NONE) { + logger.info( + "Aborting add voter operation for {} at {} since API_VERSIONS returned an error {}", + current.voterKey(), + current.voterEndpoints(), + error + ); + + abortAddVoter( + leaderState, + Errors.REQUEST_TIMED_OUT, + String.format( + "Aborted add voter operation for since API_VERSIONS returned an error %s", + error + ) + ); + + return false; + } + + // Check that the new voter supports the kraft.verion for reconfiguration + if (!validVersionRange(supportedKraftVersions)) { + logger.info( + "Aborting add voter operation for {} at {} since kraft.version range {} doesn't " + + "support reconfiguration", + current.voterKey(), + current.voterEndpoints(), + supportedKraftVersions + ); + + abortAddVoter( + leaderState, + Errors.INVALID_REQUEST, + String.format( + "Aborted add voter operation for %s since kraft.version range %s doesn't " + + "support reconfiguration", + current.voterKey(), + supportedKraftVersions + .map( + range -> String.format( + "(min: %s, max: %s", + range.minVersion(), + range.maxVersion() + ) + ) + .orElse("(min: 0, max: 0)") + ) + ); + + return true; + } + + // Check that the new voter is caught up to the LEO to avoid delays in HWM increases + if (!leaderState.isReplicaCaughtUp(current.voterKey(), currentTimeMs)) { + logger.info( + "Aborting add voter operation for {} at {} since it is lagging behind: {}", + current.voterKey(), + current.voterEndpoints(), + leaderState.getReplicaState(current.voterKey()) + ); + + abortAddVoter( + leaderState, + Errors.REQUEST_TIMED_OUT, + String.format( + "Aborted add voter operation for %s since it is lagging behind", + current.voterKey() + ) + ); + + return true; + } + + // Add the new voter to the set of voters and append the record to the log + VoterSet newVoters = partitionState + .lastVoterSet() + .addVoter( + VoterSet.VoterNode.of( + current.voterKey(), + current.voterEndpoints(), + new SupportedVersionRange( + supportedKraftVersions.get().minVersion(), + supportedKraftVersions.get().maxVersion() + ) + ) + ) + .orElseThrow(() -> + new IllegalStateException( + String.format( + "Unable to add %s to the set of voters %s", + current.voterKey(), + partitionState.lastVoterSet() + ) + ) + ); + current.setLastOffset(leaderState.appendVotersRecord(newVoters, currentTimeMs)); + + return true; + } + + public void highWatermarkUpdated(LeaderState<?> leaderState) { + leaderState.addVoterHandlerState().ifPresent(current -> { + leaderState.highWatermark().ifPresent(highWatermark -> { + current.lastOffset().ifPresent(lastOffset -> { + if (highWatermark.offset() > lastOffset) { + // VotersRecord with the added voter was committed; complete the RPC + completeAddVoter(leaderState); + } + }); + }); + }); + } + + public long maybeExpirePendingOperation(LeaderState<?> leaderState, long currentTimeMs) { + long timeUntilOperationExpiration = leaderState + .addVoterHandlerState() + .map(state -> state.timeUntilOperationExpiration(currentTimeMs)) + .orElse(Long.MAX_VALUE); + + if (timeUntilOperationExpiration == 0) { + abortAddVoter(leaderState, Errors.REQUEST_TIMED_OUT, null); + return Long.MAX_VALUE; + } else { + return timeUntilOperationExpiration; + } + + } + + private ApiVersionsRequestData buildApiVersionsRequest() { + return new ApiVersionsRequestData() + .setClientSoftwareName(ApiVersionsRequest.Builder.DEFAULT_CLIENT_SOFTWARE_NAME) + .setClientSoftwareVersion(AppInfoParser.getVersion()); + } + + private boolean isOperationPending(LeaderState<?> leaderState, long currentTimeMs) { + maybeExpirePendingOperation(leaderState, currentTimeMs); + return leaderState.addVoterHandlerState().isPresent(); + } + + private boolean validVersionRange( + Optional<ApiVersionsResponseData.SupportedFeatureKey> supportedKraftVersions + ) { + return supportedKraftVersions.isPresent() && + (supportedKraftVersions.get().minVersion() <= 1 && + supportedKraftVersions.get().maxVersion() >= 1); + } + + private void completeAddVoter(LeaderState<?> leaderState) { + leaderState.resetAddVoterHandlerState(Errors.NONE, null, Optional.empty()); + } + + private void abortAddVoter(LeaderState<?> leaderState, Errors error, String message) { + leaderState.resetAddVoterHandlerState(error, message, Optional.empty()); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java new file mode 100644 index 00000000000..3b303f2a873 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/internals/AddVoterHandlerState.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.message.AddRaftVoterResponseData; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.Timer; +import org.apache.kafka.raft.Endpoints; +import org.apache.kafka.raft.RaftUtil; + +import java.util.OptionalLong; +import java.util.concurrent.CompletableFuture; + +public final class AddVoterHandlerState implements AutoCloseable { + private final ReplicaKey voterKey; + private final Endpoints voterEndpoints; + private final Timer timeout; + private final CompletableFuture<AddRaftVoterResponseData> future = new CompletableFuture<>(); + + private OptionalLong lastOffset = OptionalLong.empty(); + + AddVoterHandlerState( + ReplicaKey voterKey, + Endpoints voterEndpoints, + Timer timeout + ) { + this.voterKey = voterKey; + this.voterEndpoints = voterEndpoints; + this.timeout = timeout; + } + + public long timeUntilOperationExpiration(long currentTimeMs) { + timeout.update(currentTimeMs); + return timeout.remainingMs(); + } + + public boolean expectingApiResponse(int replicaId) { + return !lastOffset.isPresent() && replicaId == voterKey.id(); + } + + public void setLastOffset(long lastOffset) { + if (this.lastOffset.isPresent()) { + throw new IllegalStateException( + String.format( + "Cannot override last offset to %s for adding voter %s because it is " + + "already set to %s", + lastOffset, + voterKey, + this.lastOffset + ) + ); + } + + this.lastOffset = OptionalLong.of(lastOffset); + } + + public ReplicaKey voterKey() { + return voterKey; + } + + public Endpoints voterEndpoints() { + return voterEndpoints; + } + + public OptionalLong lastOffset() { + return lastOffset; + } + + public CompletableFuture<AddRaftVoterResponseData> future() { + return future; + } + + @Override + public void close() { + future.complete(RaftUtil.addVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER, null)); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java index 09204812ae6..ff4de9b4a56 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.SnapshotFooterRecord; import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.VotersRecord; import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MutableRecordBatch; @@ -225,8 +226,9 @@ public class BatchAccumulator<T> implements Closeable { * @param valueCreator a function that uses the passed buffer to create the control * batch that will be appended. The memory records returned must contain one * control batch and that control batch have at least one record. + * @return the last of offset of the records created */ - public void appendControlMessages(MemoryRecordsCreator valueCreator) { + public long appendControlMessages(MemoryRecordsCreator valueCreator) { appendLock.lock(); try { ByteBuffer buffer = memoryPool.tryAllocate(maxBatchSize); @@ -260,6 +262,8 @@ public class BatchAccumulator<T> implements Closeable { } else { throw new IllegalStateException("Could not allocate buffer for the control record"); } + + return nextOffset - 1; } finally { appendLock.unlock(); } @@ -303,6 +307,30 @@ public class BatchAccumulator<T> implements Closeable { return numberOfRecords; } + /** + * Append a {@link VotersRecord} record to the batch + * + * @param voters the record to append + * @param currentTimestamp the current time in milliseconds + * @return the last of offset of the records created + * @throws IllegalStateException on failure to allocate a buffer for the record + */ + public long appendVotersRecord( + VotersRecord voters, + long currentTimestamp + ) { + return appendControlMessages((baseOffset, epoch, compression, buffer) -> + MemoryRecords.withVotersRecord( + baseOffset, + currentTimestamp, + epoch, + buffer, + voters + ) + ); + } + + /** * Append a {@link LeaderChangeMessage} record to the batch * diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/DefaultRequestSender.java b/raft/src/main/java/org/apache/kafka/raft/internals/DefaultRequestSender.java new file mode 100644 index 00000000000..0cee3c255d1 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/internals/DefaultRequestSender.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.raft.NetworkChannel; +import org.apache.kafka.raft.RaftMessageQueue; +import org.apache.kafka.raft.RaftRequest; +import org.apache.kafka.raft.RaftResponse; +import org.apache.kafka.raft.RaftUtil; +import org.apache.kafka.raft.RequestManager; + +import org.slf4j.Logger; + +import java.util.OptionalLong; +import java.util.function.Supplier; + +public final class DefaultRequestSender implements RequestSender { + private final RequestManager requestManager; + private final NetworkChannel channel; + private final RaftMessageQueue messageQueue; + private final Logger logger; + + public DefaultRequestSender( + RequestManager requestManager, + NetworkChannel channel, + RaftMessageQueue messageQueue, + LogContext logContext + ) { + this.requestManager = requestManager; + this.channel = channel; + this.messageQueue = messageQueue; + this.logger = logContext.logger(DefaultRequestSender.class); + } + + @Override + public ListenerName listenerName() { + return channel.listenerName(); + } + + @Override + public OptionalLong send( + Node destination, + Supplier<ApiMessage> requestSupplier, + long currentTimeMs + ) { + if (requestManager.isBackingOff(destination, currentTimeMs)) { + long remainingBackoffMs = requestManager.remainingBackoffMs(destination, currentTimeMs); + logger.debug("Connection for {} is backing off for {} ms", destination, remainingBackoffMs); + return OptionalLong.empty(); + } + + if (!requestManager.isReady(destination, currentTimeMs)) { + long remainingMs = requestManager.remainingRequestTimeMs(destination, currentTimeMs); + logger.debug("Connection for {} has a pending request for {} ms", destination, remainingMs); + return OptionalLong.empty(); + } + + int correlationId = channel.newCorrelationId(); + ApiMessage request = requestSupplier.get(); + + RaftRequest.Outbound requestMessage = new RaftRequest.Outbound( + correlationId, + request, + destination, + currentTimeMs + ); + + requestMessage.completion.whenComplete((response, exception) -> { + if (exception != null) { + ApiKeys api = ApiKeys.forId(request.apiKey()); + Errors error = Errors.forException(exception); + ApiMessage errorResponse = RaftUtil.errorResponse(api, error); + + response = new RaftResponse.Inbound( + correlationId, + errorResponse, + destination + ); + } + + messageQueue.add(response); + }); + + requestManager.onRequestSent(destination, correlationId, currentTimeMs); + channel.send(requestMessage); + logger.trace("Sent outbound request: {}", requestMessage); + + return OptionalLong.of(requestManager.remainingRequestTimeMs(destination, currentTimeMs)); + } +} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index 59663b54204..6c84935b23f 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -65,7 +65,7 @@ public final class KRaftControlRecordStateMachine { // // 2. The read operations lastVoterSet, voterSetAtOffset and kraftVersionAtOffset read // the nextOffset first before reading voterSetHistory or kraftVersionHistory - private volatile long nextOffset = 0; + private volatile long nextOffset = -1; /** * Constructs an internal log listener @@ -138,6 +138,15 @@ public final class KRaftControlRecordStateMachine { } } + /** + * Return the latest entry for the set of voters. + */ + public Optional<LogHistory.Entry<VoterSet>> lastVoterSetEntry() { + synchronized (voterSetHistory) { + return voterSetHistory.lastEntry(); + } + } + /** * Returns the offset of the last voter set. */ @@ -221,7 +230,7 @@ public final class KRaftControlRecordStateMachine { } private void maybeLoadSnapshot() { - if ((nextOffset == 0 || nextOffset < log.startOffset()) && log.latestSnapshot().isPresent()) { + if ((nextOffset == -1 || nextOffset < log.startOffset()) && log.latestSnapshot().isPresent()) { RawSnapshotReader rawSnapshot = log.latestSnapshot().get(); // Clear the current state synchronized (kraftVersionHistory) { @@ -254,6 +263,10 @@ public final class KRaftControlRecordStateMachine { nextOffset = reader.lastContainedLogOffset() + 1; } + } else if (nextOffset == -1) { + // Listener just started and there are no snapshots; set the nextOffset to + // 0 to start reading the log + nextOffset = 0; } } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RequestSender.java b/raft/src/main/java/org/apache/kafka/raft/internals/RequestSender.java new file mode 100644 index 00000000000..d30b259c7d8 --- /dev/null +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RequestSender.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.raft.internals; + +import org.apache.kafka.common.Node; +import org.apache.kafka.common.network.ListenerName; +import org.apache.kafka.common.protocol.ApiMessage; + +import java.util.OptionalLong; +import java.util.function.Supplier; + +/** + * Interface for sending KRaft requests. + * + * Responsible for managing the connection state and sending request when the connection is + * available. + */ +interface RequestSender { + /** + * The name of the listener used for sending request. + * + * This is generally the default (first) listener. + */ + ListenerName listenerName(); + + /** + * Send a KRaft request to the destination. + * + * @param destination the destination for the request + * @param requestSupplier the default constructor for the request + * @param currentTimeMs the current time + * @return the request timeout if the request was sent; otherwise {@code Optional.empty()} + */ + OptionalLong send( + Node destination, + Supplier<ApiMessage> requestSupplier, + long currentTimeMs + ); +} diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java index 2f961897982..d5de736f7b4 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java @@ -363,6 +363,14 @@ public final class VoterSet { supportedKRaftVersion ); } + + public static VoterNode of( + ReplicaKey voterKey, + Endpoints listeners, + SupportedVersionRange supportedKRaftVersion + ) { + return new VoterNode(voterKey, listeners, supportedKRaftVersion); + } } /** diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java index 1b630c3e22c..4732d6e799f 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java @@ -92,6 +92,13 @@ public final class VoterSetHistory { .orElseThrow(() -> new IllegalStateException("No voter set found")); } + /** + * Return the latest entry for the set of voters. + */ + public Optional<LogHistory.Entry<VoterSet>> lastEntry() { + return votersHistory.lastEntry(); + } + /** * Returns the offset of the last voter set stored in the partition history. * diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java index 84a9af918b6..5cdcfe5132c 100644 --- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java @@ -16,12 +16,16 @@ */ package org.apache.kafka.raft; +import org.apache.kafka.common.Node; import org.apache.kafka.common.compress.Compression; +import org.apache.kafka.common.feature.SupportedVersionRange; +import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.KRaftVersionRecord; import org.apache.kafka.common.message.LeaderChangeMessage; import org.apache.kafka.common.message.SnapshotFooterRecord; import org.apache.kafka.common.message.SnapshotHeaderRecord; import org.apache.kafka.common.message.VotersRecord; +import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.ControlRecordType; import org.apache.kafka.common.record.ControlRecordUtils; @@ -35,18 +39,22 @@ import org.apache.kafka.common.utils.BufferSupplier; import org.apache.kafka.raft.internals.ReplicaKey; import org.apache.kafka.raft.internals.VoterSet; import org.apache.kafka.raft.internals.VoterSetTest; +import org.apache.kafka.server.common.KRaftVersion; import org.apache.kafka.snapshot.RecordsSnapshotReader; import org.apache.kafka.snapshot.SnapshotReader; import org.apache.kafka.snapshot.SnapshotWriterReaderTest; import org.junit.jupiter.api.Test; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Optional; import java.util.OptionalInt; +import java.util.concurrent.ThreadLocalRandom; import java.util.stream.Stream; import static org.apache.kafka.raft.KafkaRaftClientTest.replicaKey; @@ -55,12 +63,14 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +// KAFKA-16537 will add tests for testAddVoterWithPendingRemoveVoter +// and testRemoveVoterWithPendingAddVoter public class KafkaRaftClientReconfigTest { @Test public void testLeaderWritesBootstrapRecords() throws Exception { - ReplicaKey local = replicaKey(0, true); - ReplicaKey follower = replicaKey(1, true); + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); @@ -134,8 +144,8 @@ public class KafkaRaftClientReconfigTest { @Test public void testBootstrapCheckpointIsNotReturnedOnFetch() throws Exception { - ReplicaKey local = replicaKey(0, true); - ReplicaKey follower = replicaKey(1, true); + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); @@ -164,13 +174,13 @@ public class KafkaRaftClientReconfigTest { @Test public void testLeaderDoesNotBootstrapRecordsWithKraftVersion0() throws Exception { - ReplicaKey local = replicaKey(0, true); - ReplicaKey follower = replicaKey(1, true); + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) - .withStaticVoters(voters.voterIds()) + .withStaticVoters(voters) .withBootstrapSnapshot(Optional.empty()) .withUnknownLeader(0) .build(); @@ -226,8 +236,8 @@ public class KafkaRaftClientReconfigTest { @Test public void testFollowerDoesNotRequestLeaderBootstrapSnapshot() throws Exception { - ReplicaKey local = replicaKey(0, true); - ReplicaKey leader = replicaKey(1, true); + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey leader = replicaKey(local.id() + 1, true); int epoch = 1; VoterSet voters = VoterSetTest.voterSet(Stream.of(local, leader)); @@ -256,9 +266,9 @@ public class KafkaRaftClientReconfigTest { @Test public void testFollowerReadsKRaftBootstrapRecords() throws Exception { - ReplicaKey local = replicaKey(0, true); - ReplicaKey leader = replicaKey(1, true); - ReplicaKey follower = replicaKey(2, true); + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey leader = replicaKey(local.id() + 1, true); + ReplicaKey follower = replicaKey(local.id() + 2, true); VoterSet voterSet = VoterSetTest.voterSet(Stream.of(local, leader)); int epoch = 5; @@ -324,6 +334,609 @@ public class KafkaRaftClientReconfigTest { assertTrue(context.client.quorum().isVoter(follower)); } + @Test + public void testAddVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // Show that the new voter is not currently a voter + assertFalse(context.client.quorum().isVoter(newVoter)); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Catch up the new voter to the leader's LEO + context.deliverRequest( + context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + + // Leader should send an API_VERSIONS request to the new voter's endpoint + context.pollUntilRequest(); + RaftRequest.Outbound apiVersionRequest = context.assertSentApiVersionsRequest(); + assertEquals( + new Node(newVoter.id(), newAddress.getHostString(), newAddress.getPort()), + apiVersionRequest.destination() + ); + + // Reply with API_VERSIONS response with supported kraft.version + context.deliverResponse( + apiVersionRequest.correlationId(), + apiVersionRequest.destination(), + apiVersionsResponse(Errors.NONE) + ); + + // Handle the the API_VERSIONS response + context.client.poll(); + // Append new VotersRecord to log + context.client.poll(); + // The new voter is now a voter after writing the VotersRecord to the log + assertTrue(context.client.quorum().isVoter(newVoter)); + + // Send a FETCH to increase the HWM and commit the new voter set + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Expect reply for AddVoter request + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.NONE); + } + + @Test + void testAddVoterInvalidClusterId() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // empty cluster id is rejected + context.deliverRequest(context.addVoterRequest("", Integer.MAX_VALUE, newVoter, newListeners)); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.INCONSISTENT_CLUSTER_ID); + + // invalid cluster id is rejected + context.deliverRequest(context.addVoterRequest("invalid-uuid", Integer.MAX_VALUE, newVoter, newListeners)); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.INCONSISTENT_CLUSTER_ID); + } + + @Test + void testAddVoterToNotLeader() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.NOT_LEADER_OR_FOLLOWER); + } + + @Test + void testAddVoterWithMissingDefaultListener() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(ListenerName.normalised("not_the_default_listener"), newAddress) + ); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.INVALID_REQUEST); + } + + @Test + void testAddVoterWithPendingAddVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Catch up the new voter to the leader's LEO + context.deliverRequest( + context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + + // Attempting to add another voter should be an error + ReplicaKey anotherNewVoter = replicaKey(local.id() + 3, true); + InetSocketAddress anotherNewAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + anotherNewVoter.id() + ); + Endpoints anotherNewListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), anotherNewAddress) + ); + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, anotherNewVoter, anotherNewListeners)); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT); + } + + @Test + void testAddVoterWithoutFencedPreviousLeaders() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // Catch up the new voter to the leader's LEO + context.deliverRequest( + context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT); + } + + @Test + void testAddVoterWithKraftVersion0() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withStaticVoters(voters) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Catch up the new voter to the leader's LEO + context.deliverRequest( + context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.UNSUPPORTED_VERSION); + } + + @Test + void testAddVoterWithExistingVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(follower.id(), true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Catch up the new voter to the leader's LEO + context.deliverRequest( + context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter with the same id as an existing voter + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.DUPLICATE_VOTER); + } + + @Test + void testAddVoterTimeout() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // Show that the new voter is not currently a voter + assertFalse(context.client.quorum().isVoter(newVoter)); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Catch up the new voter to the leader's LEO + context.deliverRequest( + context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + + // Leader should send an API_VERSIONS request to the new voter's endpoint + context.pollUntilRequest(); + RaftRequest.Outbound apiVersionRequest = context.assertSentApiVersionsRequest(); + assertEquals( + new Node(newVoter.id(), newAddress.getHostString(), newAddress.getPort()), + apiVersionRequest.destination() + ); + + // Reply with API_VERSIONS response with supported kraft.version + context.deliverResponse( + apiVersionRequest.correlationId(), + apiVersionRequest.destination(), + apiVersionsResponse(Errors.NONE) + ); + + // Handle the the API_VERSIONS response + context.client.poll(); + // Append new VotersRecord to log + context.client.poll(); + // The new voter is now a voter after writing the VotersRecord to the log + assertTrue(context.client.quorum().isVoter(newVoter)); + + context.time.sleep(context.requestTimeoutMs()); + + // Expect the AddVoter RPC to timeout + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT); + } + + @Test + void testAddVoterWithApiVersionsFromIncorrectNode() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Catch up the new voter to the leader's LEO + context.deliverRequest( + context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + + // Leader should send an API_VERSIONS request to the new voter's endpoint + context.pollUntilRequest(); + RaftRequest.Outbound apiVersionRequest = context.assertSentApiVersionsRequest(); + assertEquals( + new Node(newVoter.id(), newAddress.getHostString(), newAddress.getPort()), + apiVersionRequest.destination() + ); + + // Reply with API_VERSIONS response with supported kraft.version + context.deliverResponse( + apiVersionRequest.correlationId(), + apiVersionRequest.destination(), + apiVersionsResponse(Errors.INVALID_REQUEST) + ); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT); + } + + @Test + void testAddVoterInvalidFeatureVersion() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Catch up the new voter to the leader's LEO + context.deliverRequest( + context.fetchRequest(epoch, newVoter, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + + // Leader should send an API_VERSIONS request to the new voter's endpoint + context.pollUntilRequest(); + RaftRequest.Outbound apiVersionRequest = context.assertSentApiVersionsRequest(); + assertEquals( + new Node(newVoter.id(), newAddress.getHostString(), newAddress.getPort()), + apiVersionRequest.destination() + ); + + // Reply with API_VERSIONS response with supported kraft.version + context.deliverResponse( + apiVersionRequest.correlationId(), + apiVersionRequest.destination(), + apiVersionsResponse(Errors.NONE, new SupportedVersionRange((short) 0)) + ); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.INVALID_REQUEST); + } + + @Test + void testAddVoterWithLaggingNewVoter() throws Exception { + ReplicaKey local = replicaKey(randomeReplicaId(), true); + ReplicaKey follower = replicaKey(local.id() + 1, true); + + VoterSet voters = VoterSetTest.voterSet(Stream.of(local, follower)); + + RaftClientTestContext context = new RaftClientTestContext.Builder(local.id(), local.directoryId().get()) + .withKip853Rpc(true) + .withBootstrapSnapshot(Optional.of(voters)) + .withUnknownLeader(3) + .build(); + + context.becomeLeader(); + int epoch = context.currentEpoch(); + + ReplicaKey newVoter = replicaKey(local.id() + 2, true); + InetSocketAddress newAddress = InetSocketAddress.createUnresolved( + "localhost", + 9990 + newVoter.id() + ); + Endpoints newListeners = Endpoints.fromInetSocketAddresses( + Collections.singletonMap(context.channel.listenerName(), newAddress) + ); + + // Establish a HWM and fence previous leaders + context.deliverRequest( + context.fetchRequest(epoch, follower, context.log.endOffset().offset(), epoch, 0) + ); + context.pollUntilResponse(); + context.assertSentFetchPartitionResponse(Errors.NONE, epoch, OptionalInt.of(local.id())); + + // Attempt to add new voter to the quorum + context.deliverRequest(context.addVoterRequest(Integer.MAX_VALUE, newVoter, newListeners)); + + // Leader should send an API_VERSIONS request to the new voter's endpoint + context.pollUntilRequest(); + RaftRequest.Outbound apiVersionRequest = context.assertSentApiVersionsRequest(); + assertEquals( + new Node(newVoter.id(), newAddress.getHostString(), newAddress.getPort()), + apiVersionRequest.destination() + ); + + // Reply with API_VERSIONS response with supported kraft.version + context.deliverResponse( + apiVersionRequest.correlationId(), + apiVersionRequest.destination(), + apiVersionsResponse(Errors.NONE) + ); + context.pollUntilResponse(); + context.assertSentAddVoterResponse(Errors.REQUEST_TIMED_OUT); + } + private static void verifyVotersRecord( VoterSet expectedVoterSet, ByteBuffer recordKey, @@ -346,4 +959,30 @@ public class KafkaRaftClientReconfigTest { KRaftVersionRecord kRaftVersionRecord = ControlRecordUtils.deserializeKRaftVersionRecord(recordValue); assertEquals(expectedKRaftVersion, kRaftVersionRecord.kRaftVersion()); } + + private int randomeReplicaId() { + return ThreadLocalRandom.current().nextInt(1025); + } + + private static ApiVersionsResponseData apiVersionsResponse(Errors error) { + return apiVersionsResponse(error, new SupportedVersionRange((short) 0, (short) 1)); + } + + private static ApiVersionsResponseData apiVersionsResponse(Errors error, SupportedVersionRange supportedVersions) { + ApiVersionsResponseData.SupportedFeatureKeyCollection supportedFeatures = + new ApiVersionsResponseData.SupportedFeatureKeyCollection(1); + + if (supportedVersions.max() > 0) { + supportedFeatures.add( + new ApiVersionsResponseData.SupportedFeatureKey() + .setName(KRaftVersion.FEATURE_NAME) + .setMinVersion(supportedVersions.min()) + .setMaxVersion(supportedVersions.max()) + ); + } + + return new ApiVersionsResponseData() + .setErrorCode(error.code()) + .setSupportedFeatures(supportedFeatures); + } } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java index 60147c07888..0f8f01876e1 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java @@ -21,6 +21,9 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.Uuid; import org.apache.kafka.common.compress.Compression; import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.message.AddRaftVoterRequestData; +import org.apache.kafka.common.message.AddRaftVoterResponseData; +import org.apache.kafka.common.message.ApiVersionsResponseData; import org.apache.kafka.common.message.BeginQuorumEpochRequestData; import org.apache.kafka.common.message.BeginQuorumEpochResponseData; import org.apache.kafka.common.message.DescribeQuorumRequestData; @@ -300,12 +303,16 @@ public final class RaftClientTestContext { Collectors.toMap(Function.identity(), RaftClientTestContext::mockAddress) ); - this.startingVoters = Optional.of( + return withStaticVoters( VoterSet.fromInetSocketAddresses( MockNetworkChannel.LISTENER_NAME, staticVoterAddressMap ) ); + } + + Builder withStaticVoters(VoterSet staticVoters) { + this.startingVoters = Optional.of(staticVoters); this.isStartingVotersStatic = true; return this; @@ -1081,6 +1088,26 @@ public final class RaftClientTestContext { return result; } + RaftRequest.Outbound assertSentApiVersionsRequest() { + List<RaftRequest.Outbound> sentRequests = channel.drainSentRequests(Optional.of(ApiKeys.API_VERSIONS)); + assertEquals(1, sentRequests.size()); + + return sentRequests.get(0); + } + + AddRaftVoterResponseData assertSentAddVoterResponse(Errors error) { + List<RaftResponse.Outbound> sentResponses = drainSentResponses(ApiKeys.ADD_RAFT_VOTER); + assertEquals(1, sentResponses.size()); + + RaftResponse.Outbound response = sentResponses.get(0); + assertInstanceOf(AddRaftVoterResponseData.class, response.data()); + + AddRaftVoterResponseData addVoterResponse = (AddRaftVoterResponseData) response.data(); + assertEquals(error, Errors.forCode(addVoterResponse.errorCode())); + + return addVoterResponse; + } + List<RaftRequest.Outbound> collectEndQuorumRequests( int epoch, Set<Integer> destinationIdSet, @@ -1339,12 +1366,8 @@ public final class RaftClientTestContext { voters .stream() .map(voterId -> new Voter().setVoterId(voterId)) - .collect(Collectors.toList()), - leaderChangeMessage - .voters() - .stream() - .sorted(Comparator.comparingInt(Voter::voterId)) - .collect(Collectors.toList()) + .collect(Collectors.toSet()), + new HashSet<>(leaderChangeMessage.voters()) ); assertEquals( grantingVoters @@ -1545,6 +1568,34 @@ public final class RaftClientTestContext { return RaftUtil.singletonDescribeQuorumRequest(metadataPartition); } + AddRaftVoterRequestData addVoterRequest( + int timeoutMs, + ReplicaKey voter, + Endpoints endpoints + ) { + return addVoterRequest( + clusterId.toString(), + timeoutMs, + voter, + endpoints + ); + } + + AddRaftVoterRequestData addVoterRequest( + String clusterId, + int timeoutMs, + ReplicaKey voter, + Endpoints endpoints + ) { + return RaftUtil.addVoterRequest( + clusterId, + timeoutMs, + voter, + endpoints + ); + } + + private short fetchRpcVersion() { if (kip853Rpc) { return 17; @@ -1593,6 +1644,15 @@ public final class RaftClientTestContext { } } + private short addVoterRpcVersion() { + if (kip853Rpc) { + return 0; + } else { + throw new IllegalStateException("Reconfiguration must be enabled by calling withKip853Rpc(true)"); + } + } + + private short raftRequestVersion(ApiMessage request) { if (request instanceof FetchRequestData) { return fetchRpcVersion(); @@ -1606,6 +1666,8 @@ public final class RaftClientTestContext { return endQuorumEpochRpcVersion(); } else if (request instanceof DescribeQuorumRequestData) { return describeQuorumRpcVersion(); + } else if (request instanceof AddRaftVoterRequestData) { + return addVoterRpcVersion(); } else { throw new IllegalArgumentException(String.format("Request %s is not a raft request", request)); } @@ -1624,6 +1686,10 @@ public final class RaftClientTestContext { return endQuorumEpochRpcVersion(); } else if (response instanceof DescribeQuorumResponseData) { return describeQuorumRpcVersion(); + } else if (response instanceof AddRaftVoterResponseData) { + return addVoterRpcVersion(); + } else if (response instanceof ApiVersionsResponseData) { + return 4; } else { throw new IllegalArgumentException(String.format("Request %s is not a raft response", response)); }
