This is an automated email from the ASF dual-hosted git repository. junrao pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 269b652 KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… (#3848) 269b652 is described below commit 269b65279c746bc54c611141a5a6509f9b310f11 Author: Tom Bentley <tombent...@users.noreply.github.com> AuthorDate: Fri Jan 25 22:06:18 2019 +0000 KAFKA-5692: Change PreferredReplicaLeaderElectionCommand to use Admin… (#3848) See also KIP-183. This implements the following algorithm: AdminClient sends ElectPreferredLeadersRequest. KafakApis receives ElectPreferredLeadersRequest and delegates to ReplicaManager.electPreferredLeaders() ReplicaManager delegates to KafkaController.electPreferredLeaders() KafkaController adds a PreferredReplicaLeaderElection to the EventManager, ReplicaManager.electPreferredLeaders()'s callback uses the delayedElectPreferredReplicasPurgatory to wait for the results of the election to appear in the metadata cache. If there are no results because of errors, or because the preferred leaders are already leading the partitions then a response is returned immediately. In the EventManager work thread the preferred leader is elected as follows: The EventManager runs PreferredReplicaLeaderElection.process() process() calls KafkaController.onPreferredReplicaElectionWithResults() KafkaController.onPreferredReplicaElectionWithResults() calls the PartitionStateMachine.handleStateChangesWithResults() to perform the election (asynchronously the PSM will send LeaderAndIsrRequest to the new and old leaders and UpdateMetadataRequest to all brokers) then invokes the callback. Reviewers: Colin P. McCabe <cmcc...@apache.org>, Jun Rao <jun...@gmail.com> --- checkstyle/import-control.xml | 2 + .../org/apache/kafka/clients/NetworkClient.java | 6 +- .../apache/kafka/clients/admin/AdminClient.java | 52 ++++ .../admin/ElectPreferredLeadersOptions.java | 31 ++ .../clients/admin/ElectPreferredLeadersResult.java | 136 +++++++++ .../kafka/clients/admin/KafkaAdminClient.java | 33 ++ .../PreferredLeaderNotAvailableException.java | 28 ++ .../org/apache/kafka/common/protocol/ApiKeys.java | 14 +- .../org/apache/kafka/common/protocol/Errors.java | 5 +- .../kafka/common/requests/AbstractRequest.java | 2 + .../kafka/common/requests/AbstractResponse.java | 4 +- .../requests/ElectPreferredLeadersRequest.java | 129 ++++++++ .../requests/ElectPreferredLeadersResponse.java | 78 +++++ .../message/ElectPreferredLeadersRequest.json | 33 ++ .../message/ElectPreferredLeadersResponse.json | 39 +++ .../src/main/resources/common/message/README.md | 2 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 54 ++++ .../kafka/clients/admin/MockAdminClient.java | 4 + .../kafka/common/requests/RequestContextTest.java | 3 +- .../kafka/common/requests/RequestResponseTest.java | 40 ++- .../PreferredReplicaLeaderElectionCommand.scala | 217 +++++++++++-- .../kafka/controller/ControllerEventManager.scala | 5 + .../scala/kafka/controller/KafkaController.scala | 159 ++++++++-- .../kafka/controller/PartitionStateMachine.scala | 40 ++- .../kafka/server/DelayedElectPreferredLeader.scala | 89 ++++++ core/src/main/scala/kafka/server/KafkaApis.scala | 49 +++ .../main/scala/kafka/server/MetadataCache.scala | 6 + .../main/scala/kafka/server/ReplicaManager.scala | 37 +++ .../kafka/api/AdminClientIntegrationTest.scala | 203 ++++++++++++- .../kafka/api/AuthorizerIntegrationTest.scala | 20 +- ...PreferredReplicaLeaderElectionCommandTest.scala | 337 +++++++++++++++++++++ .../AbstractCoordinatorConcurrencyTest.scala | 2 +- .../scala/unit/kafka/server/KafkaApisTest.scala | 2 +- .../unit/kafka/server/ReplicaManagerTest.scala | 8 +- .../scala/unit/kafka/server/RequestQuotaTest.scala | 11 + 35 files changed, 1798 insertions(+), 82 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index a0bf740..0d316c5 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -116,6 +116,7 @@ <subpackage name="protocol"> <allow pkg="org.apache.kafka.common.errors" /> + <allow pkg="org.apache.kafka.common.message" /> <allow pkg="org.apache.kafka.common.protocol.types" /> <allow pkg="org.apache.kafka.common.record" /> <allow pkg="org.apache.kafka.common.requests" /> @@ -140,6 +141,7 @@ <subpackage name="requests"> <allow pkg="org.apache.kafka.common.acl" /> <allow pkg="org.apache.kafka.common.protocol" /> + <allow pkg="org.apache.kafka.common.message" /> <allow pkg="org.apache.kafka.common.network" /> <allow pkg="org.apache.kafka.common.requests" /> <allow pkg="org.apache.kafka.common.resource" /> diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 144987e..3973701 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -676,7 +676,8 @@ public class NetworkClient implements KafkaClient { public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { Struct responseStruct = parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0); - return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct); + return AbstractResponse.parseResponse(requestHeader.apiKey(), responseStruct, + requestHeader.apiVersion()); } private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, @@ -811,7 +812,8 @@ public class NetworkClient implements KafkaClient { req.header.apiKey(), req.header.correlationId(), responseStruct); } // If the received response includes a throttle delay, throttle the connection. - AbstractResponse body = AbstractResponse.parseResponse(req.header.apiKey(), responseStruct); + AbstractResponse body = AbstractResponse. + parseResponse(req.header.apiKey(), responseStruct, req.header.apiVersion()); maybeThrottle(body, req.header.apiVersion(), req.destination, now); if (req.isInternalRequest && body instanceof MetadataResponse) metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java index bdd7cc3..b823cdc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClient.java @@ -793,6 +793,58 @@ public abstract class AdminClient implements AutoCloseable { } /** + * Elect the preferred broker of the given {@code partitions} as leader, or + * elect the preferred broker for all partitions as leader if the argument to {@code partitions} is null. + * + * This is a convenience method for {@link #electPreferredLeaders(Collection, ElectPreferredLeadersOptions)} + * with default options. + * See the overload for more details. + * + * @param partitions The partitions for which the preferred leader should be elected. + * @return The ElectPreferredLeadersResult. + */ + public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions) { + return electPreferredLeaders(partitions, new ElectPreferredLeadersOptions()); + } + + /** + * Elect the preferred broker of the given {@code partitions} as leader, or + * elect the preferred broker for all partitions as leader if the argument to {@code partitions} is null. + * + * This operation is not transactional so it may succeed for some partitions while fail for others. + * + * It may take several seconds after this method returns + * success for all the brokers in the cluster to become aware that the partitions have new leaders. + * During this time, {@link AdminClient#describeTopics(Collection)} + * may not return information about the partitions' new leaders. + * + * This operation is supported by brokers with version 2.2.0 or higher. + * + * <p>The following exceptions can be anticipated when calling {@code get()} on the futures obtained from + * the returned {@code ElectPreferredLeadersResult}:</p> + * <ul> + * <li>{@link org.apache.kafka.common.errors.ClusterAuthorizationException} + * if the authenticated user didn't have alter access to the cluster.</li> + * <li>{@link org.apache.kafka.common.errors.UnknownTopicOrPartitionException} + * if the topic or partition did not exist within the cluster.</li> + * <li>{@link org.apache.kafka.common.errors.InvalidTopicException} + * if the topic was already queued for deletion.</li> + * <li>{@link org.apache.kafka.common.errors.NotControllerException} + * if the request was sent to a broker that was not the controller for the cluster.</li> + * <li>{@link org.apache.kafka.common.errors.TimeoutException} + * if the request timed out before the election was complete.</li> + * <li>{@link org.apache.kafka.common.errors.LeaderNotAvailableException} + * if the preferred leader was not alive or not in the ISR.</li> + * </ul> + * + * @param partitions The partitions for which the preferred leader should be elected. + * @param options The options to use when electing the preferred leaders. + * @return The ElectPreferredLeadersResult. + */ + public abstract ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions, + ElectPreferredLeadersOptions options); + + /** * Get the metrics kept by the adminClient */ public abstract Map<MetricName, ? extends Metric> metrics(); diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java new file mode 100644 index 0000000..80b0097 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersOptions.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.annotation.InterfaceStability; + +import java.util.Collection; + +/** + * Options for {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)}. + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ElectPreferredLeadersOptions extends AbstractOptions<ElectPreferredLeadersOptions> { +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java new file mode 100644 index 0000000..c76336a --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/clients/admin/ElectPreferredLeadersResult.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients.admin; + +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; + +import java.util.Collection; +import java.util.Map; +import java.util.Set; + +/** + * The result of {@link AdminClient#electPreferredLeaders(Collection, ElectPreferredLeadersOptions)} + * + * The API of this class is evolving, see {@link AdminClient} for details. + */ +@InterfaceStability.Evolving +public class ElectPreferredLeadersResult { + + private final KafkaFutureImpl<Map<TopicPartition, ApiError>> electionFuture; + private final Set<TopicPartition> partitions; + + ElectPreferredLeadersResult(KafkaFutureImpl<Map<TopicPartition, ApiError>> electionFuture, Set<TopicPartition> partitions) { + this.electionFuture = electionFuture; + this.partitions = partitions; + } + + /** + * Get the result of the election for the given {@code partition}. + * If there was not an election triggered for the given {@code partition}, the + * returned future will complete with an error. + */ + public KafkaFuture<Void> partitionResult(final TopicPartition partition) { + final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>(); + electionFuture.whenComplete(new KafkaFuture.BiConsumer<Map<TopicPartition, ApiError>, Throwable>() { + @Override + public void accept(Map<TopicPartition, ApiError> topicPartitions, Throwable throwable) { + if (throwable != null) { + result.completeExceptionally(throwable); + } else if (!topicPartitions.containsKey(partition)) { + result.completeExceptionally(new UnknownTopicOrPartitionException( + "Preferred leader election for partition \"" + partition + + "\" was not attempted")); + } else { + if (partitions == null && topicPartitions.isEmpty()) { + result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); + } + ApiException exception = topicPartitions.get(partition).exception(); + if (exception == null) { + result.complete(null); + } else { + result.completeExceptionally(exception); + } + } + } + }); + return result; + } + + /** + * <p>Get a future for the topic partitions for which a leader election + * was attempted. A partition will be present in this result if + * an election was attempted even if the election was not successful.</p> + * + * <p>This method is provided to discover the partitions attempted when + * {@link AdminClient#electPreferredLeaders(Collection)} is called + * with a null {@code partitions} argument.</p> + */ + public KafkaFuture<Set<TopicPartition>> partitions() { + if (partitions != null) { + return KafkaFutureImpl.completedFuture(this.partitions); + } else { + final KafkaFutureImpl<Set<TopicPartition>> result = new KafkaFutureImpl<>(); + electionFuture.whenComplete(new KafkaFuture.BiConsumer<Map<TopicPartition, ApiError>, Throwable>() { + @Override + public void accept(Map<TopicPartition, ApiError> topicPartitions, Throwable throwable) { + if (throwable != null) { + result.completeExceptionally(throwable); + } else if (topicPartitions.isEmpty()) { + result.completeExceptionally(Errors.CLUSTER_AUTHORIZATION_FAILED.exception()); + } else { + for (ApiError apiError : topicPartitions.values()) { + if (apiError.isFailure()) { + result.completeExceptionally(apiError.exception()); + } + } + result.complete(topicPartitions.keySet()); + } + } + }); + return result; + } + } + + /** + * Return a future which succeeds if all the topic elections succeed. + */ + public KafkaFuture<Void> all() { + final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>(); + electionFuture.thenApply(new KafkaFuture.Function<Map<TopicPartition, ApiError>, Void>() { + @Override + public Void apply(Map<TopicPartition, ApiError> topicPartitions) { + for (ApiError apiError : topicPartitions.values()) { + if (apiError.isFailure()) { + result.completeExceptionally(apiError.exception()); + return null; + } + } + result.complete(null); + return null; + } + }); + return result; + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 2ba3cf2..58baab7 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -105,6 +105,8 @@ import org.apache.kafka.common.requests.DescribeGroupsRequest; import org.apache.kafka.common.requests.DescribeGroupsResponse; import org.apache.kafka.common.requests.DescribeLogDirsRequest; import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.common.requests.ElectPreferredLeadersRequest; +import org.apache.kafka.common.requests.ElectPreferredLeadersResponse; import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; import org.apache.kafka.common.requests.ExpireDelegationTokenResponse; import org.apache.kafka.common.requests.FindCoordinatorRequest; @@ -2777,4 +2779,35 @@ public class KafkaAdminClient extends AdminClient { public Map<MetricName, ? extends Metric> metrics() { return Collections.unmodifiableMap(this.metrics.metrics()); } + + @Override + public ElectPreferredLeadersResult electPreferredLeaders(final Collection<TopicPartition> partitions, + ElectPreferredLeadersOptions options) { + final Set<TopicPartition> partitionSet = partitions != null ? new HashSet<>(partitions) : null; + final KafkaFutureImpl<Map<TopicPartition, ApiError>> electionFuture = new KafkaFutureImpl<>(); + final long now = time.milliseconds(); + runnable.call(new Call("electPreferredLeaders", calcDeadlineMs(now, options.timeoutMs()), + new ControllerNodeProvider()) { + + @Override + public AbstractRequest.Builder createRequest(int timeoutMs) { + return new ElectPreferredLeadersRequest.Builder( + ElectPreferredLeadersRequest.toRequestData(partitions, timeoutMs)); + } + + @Override + public void handleResponse(AbstractResponse abstractResponse) { + ElectPreferredLeadersResponse response = (ElectPreferredLeadersResponse) abstractResponse; + electionFuture.complete( + ElectPreferredLeadersRequest.fromResponseData(response.data())); + } + + @Override + void handleFailure(Throwable throwable) { + electionFuture.completeExceptionally(throwable); + } + }, now); + return new ElectPreferredLeadersResult(electionFuture, partitionSet); + } + } diff --git a/clients/src/main/java/org/apache/kafka/common/errors/PreferredLeaderNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/PreferredLeaderNotAvailableException.java new file mode 100644 index 0000000..73dfd64 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/PreferredLeaderNotAvailableException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class PreferredLeaderNotAvailableException extends InvalidMetadataException { + + public PreferredLeaderNotAvailableException(String message) { + super(message); + } + + public PreferredLeaderNotAvailableException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 3d77100..80b118b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.common.protocol; +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; @@ -35,10 +37,10 @@ import org.apache.kafka.common.requests.ControlledShutdownRequest; import org.apache.kafka.common.requests.ControlledShutdownResponse; import org.apache.kafka.common.requests.CreateAclsRequest; import org.apache.kafka.common.requests.CreateAclsResponse; -import org.apache.kafka.common.requests.CreatePartitionsRequest; -import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateDelegationTokenRequest; import org.apache.kafka.common.requests.CreateDelegationTokenResponse; +import org.apache.kafka.common.requests.CreatePartitionsRequest; +import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteAclsRequest; @@ -53,12 +55,12 @@ import org.apache.kafka.common.requests.DescribeAclsRequest; import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; import org.apache.kafka.common.requests.DescribeConfigsResponse; +import org.apache.kafka.common.requests.DescribeDelegationTokenRequest; +import org.apache.kafka.common.requests.DescribeDelegationTokenResponse; import org.apache.kafka.common.requests.DescribeGroupsRequest; import org.apache.kafka.common.requests.DescribeGroupsResponse; import org.apache.kafka.common.requests.DescribeLogDirsRequest; import org.apache.kafka.common.requests.DescribeLogDirsResponse; -import org.apache.kafka.common.requests.DescribeDelegationTokenRequest; -import org.apache.kafka.common.requests.DescribeDelegationTokenResponse; import org.apache.kafka.common.requests.EndTxnRequest; import org.apache.kafka.common.requests.EndTxnResponse; import org.apache.kafka.common.requests.ExpireDelegationTokenRequest; @@ -186,7 +188,9 @@ public enum ApiKeys { RENEW_DELEGATION_TOKEN(39, "RenewDelegationToken", RenewDelegationTokenRequest.schemaVersions(), RenewDelegationTokenResponse.schemaVersions()), EXPIRE_DELEGATION_TOKEN(40, "ExpireDelegationToken", ExpireDelegationTokenRequest.schemaVersions(), ExpireDelegationTokenResponse.schemaVersions()), DESCRIBE_DELEGATION_TOKEN(41, "DescribeDelegationToken", DescribeDelegationTokenRequest.schemaVersions(), DescribeDelegationTokenResponse.schemaVersions()), - DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()); + DELETE_GROUPS(42, "DeleteGroups", DeleteGroupsRequest.schemaVersions(), DeleteGroupsResponse.schemaVersions()), + ELECT_PREFERRED_LEADERS(43, "ElectPreferredLeaders", ElectPreferredLeadersRequestData.SCHEMAS, + ElectPreferredLeadersResponseData.SCHEMAS); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 14ca06d..5bcff43 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -72,6 +72,7 @@ import org.apache.kafka.common.errors.OffsetOutOfRangeException; import org.apache.kafka.common.errors.OperationNotAttemptedException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.PolicyViolationException; +import org.apache.kafka.common.errors.PreferredLeaderNotAvailableException; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.ReassignmentInProgressException; import org.apache.kafka.common.errors.RebalanceInProgressException; @@ -297,7 +298,9 @@ public enum Errors { "election so the offsets cannot be guaranteed to be monotonically increasing", OffsetNotAvailableException::new), MEMBER_ID_REQUIRED(79, "The group member needs to have a valid member id before actually entering a consumer group", - MemberIdRequiredException::new); + MemberIdRequiredException::new), + PREFERRED_LEADER_NOT_AVAILABLE(80, "The preferred leader was not available", + PreferredLeaderNotAvailableException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index d16e60f..239024f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -227,6 +227,8 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return new DescribeDelegationTokenRequest(struct, apiVersion); case DELETE_GROUPS: return new DeleteGroupsRequest(struct, apiVersion); + case ELECT_PREFERRED_LEADERS: + return new ElectPreferredLeadersRequest(struct, apiVersion); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index c0ebef1..036814c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -68,7 +68,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { protected abstract Struct toStruct(short version); - public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct) { + public static AbstractResponse parseResponse(ApiKeys apiKey, Struct struct, short version) { switch (apiKey) { case PRODUCE: return new ProduceResponse(struct); @@ -156,6 +156,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse { return new DescribeDelegationTokenResponse(struct); case DELETE_GROUPS: return new DeleteGroupsResponse(struct); + case ELECT_PREFERRED_LEADERS: + return new ElectPreferredLeadersResponse(struct, version); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " + "code should be updated to do so.", apiKey)); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java new file mode 100644 index 0000000..ab96e3b --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersRequest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.utils.CollectionUtils; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ElectPreferredLeadersRequest extends AbstractRequest { + public static class Builder extends AbstractRequest.Builder<ElectPreferredLeadersRequest> { + private final ElectPreferredLeadersRequestData data; + + public Builder(ElectPreferredLeadersRequestData data) { + super(ApiKeys.ELECT_PREFERRED_LEADERS); + this.data = data; + } + + @Override + public ElectPreferredLeadersRequest build(short version) { + return new ElectPreferredLeadersRequest(data, version); + } + + @Override + public String toString() { + return data.toString(); + } + } + + public static ElectPreferredLeadersRequestData toRequestData(Collection<TopicPartition> partitions, int timeoutMs) { + ElectPreferredLeadersRequestData d = new ElectPreferredLeadersRequestData() + .setTimeoutMs(timeoutMs); + if (partitions != null) { + for (Map.Entry<String, List<Integer>> tp : CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) { + d.topicPartitions().add(new ElectPreferredLeadersRequestData.TopicPartitions().setTopic(tp.getKey()).setPartitionId(tp.getValue())); + } + } else { + d.setTopicPartitions(null); + } + return d; + } + + public static Map<TopicPartition, ApiError> fromResponseData(ElectPreferredLeadersResponseData data) { + Map<TopicPartition, ApiError> map = new HashMap<>(); + for (ElectPreferredLeadersResponseData.ReplicaElectionResult topicResults : data.replicaElectionResults()) { + for (ElectPreferredLeadersResponseData.PartitionResult partitionResult : topicResults.partitionResult()) { + map.put(new TopicPartition(topicResults.topic(), partitionResult.partitionId()), + new ApiError(Errors.forCode(partitionResult.errorCode()), + partitionResult.errorMessage())); + } + } + return map; + } + + private final ElectPreferredLeadersRequestData data; + private final short version; + + private ElectPreferredLeadersRequest(ElectPreferredLeadersRequestData data, short version) { + super(ApiKeys.ELECT_PREFERRED_LEADERS, version); + this.data = data; + this.version = version; + } + + public ElectPreferredLeadersRequest(Struct struct, short version) { + super(ApiKeys.ELECT_PREFERRED_LEADERS, version); + this.data = new ElectPreferredLeadersRequestData(struct, version); + this.version = version; + } + + public ElectPreferredLeadersRequestData data() { + return data; + } + + @Override + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { + ElectPreferredLeadersResponseData response = new ElectPreferredLeadersResponseData(); + response.setThrottleTimeMs(throttleTimeMs); + ApiError apiError = ApiError.fromThrowable(e); + for (TopicPartitions topic : data.topicPartitions()) { + ReplicaElectionResult electionResult = new ReplicaElectionResult().setTopic(topic.topic()); + for (Integer partitionId : topic.partitionId()) { + electionResult.partitionResult().add(new ElectPreferredLeadersResponseData.PartitionResult() + .setPartitionId(partitionId) + .setErrorCode(apiError.error().code()) + .setErrorMessage(apiError.message())); + } + response.replicaElectionResults().add(electionResult); + } + return new ElectPreferredLeadersResponse(response); + } + + public static ElectPreferredLeadersRequest parse(ByteBuffer buffer, short version) { + return new ElectPreferredLeadersRequest(ApiKeys.ELECT_PREFERRED_LEADERS.parseRequest(version, buffer), version); + } + + /** + * Visible for testing. + */ + @Override + public Struct toStruct() { + return data.toStruct(version); + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java new file mode 100644 index 0000000..d19a51d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectPreferredLeadersResponse.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +public class ElectPreferredLeadersResponse extends AbstractResponse { + + private final ElectPreferredLeadersResponseData data; + + public ElectPreferredLeadersResponse(ElectPreferredLeadersResponseData data) { + this.data = data; + } + + public ElectPreferredLeadersResponse(Struct struct, short version) { + this.data = new ElectPreferredLeadersResponseData(struct, version); + } + + public ElectPreferredLeadersResponseData data() { + return data; + } + + @Override + protected Struct toStruct(short version) { + return data.toStruct(version); + } + + @Override + public int throttleTimeMs() { + return data.throttleTimeMs(); + } + + @Override + public Map<Errors, Integer> errorCounts() { + HashMap<Errors, Integer> counts = new HashMap<>(); + for (ReplicaElectionResult result : data.replicaElectionResults()) { + for (PartitionResult partitionResult : result.partitionResult()) { + Errors error = Errors.forCode(partitionResult.errorCode()); + counts.put(error, counts.getOrDefault(error, 0) + 1); + } + } + return counts; + } + + public static ElectPreferredLeadersResponse parse(ByteBuffer buffer, short version) { + return new ElectPreferredLeadersResponse( + ApiKeys.ELECT_PREFERRED_LEADERS.responseSchema(version).read(buffer), version); + } + + @Override + public boolean shouldClientThrottle(short version) { + return version >= 3; + } +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json b/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json new file mode 100644 index 0000000..f566cdf --- /dev/null +++ b/clients/src/main/resources/common/message/ElectPreferredLeadersRequest.json @@ -0,0 +1,33 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 43, + "type": "request", + "name": "ElectPreferredLeadersRequest", + "validVersions": "0", + "fields": [ + { "name": "TopicPartitions", "type": "[]TopicPartitions", "versions": "0+", "nullableVersions": "0+", + "about": "The topic partitions to elect the preferred leader of.", + "fields": [ + { "name": "Topic", "type": "string", "versions": "0+", + "about": "The name of a topic." }, + { "name": "PartitionId", "type": "[]int32", "versions": "0+", + "about": "The partitions of this topic whose preferred leader should be elected" } + ]}, + { "name": "TimeoutMs", "type": "int32", "versions": "0+", "default": "60000", + "about": "The time in ms to wait for the election to complete." } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json b/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json new file mode 100644 index 0000000..f34599c --- /dev/null +++ b/clients/src/main/resources/common/message/ElectPreferredLeadersResponse.json @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 43, + "type": "response", + "name": "ElectPreferredLeadersResponse", + "validVersions": "0", + "fields": [ + { "name": "ThrottleTimeMs", "type": "int32", "versions": "0+", + "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, + { "name": "ReplicaElectionResults", "type": "[]ReplicaElectionResult", "versions": "0+", + "about": "The error code, or 0 if there was no error.", "fields": [ + { "name": "Topic", "type": "string", "versions": "0+", + "about": "The topic name" }, + { "name": "PartitionResult", "type": "[]PartitionResult", "versions": "0+", + "about": "The results for each partition", "fields": [ + { "name": "PartitionId", "type": "int32", "versions": "0+", + "about": "The partition id" }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The result error, or zero if there was no error."}, + { "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", + "about": "The result message, or null if there was no error."} + ]} + ]} + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/README.md b/clients/src/main/resources/common/message/README.md index 5648f37..482b1dd 100644 --- a/clients/src/main/resources/common/message/README.md +++ b/clients/src/main/resources/common/message/README.md @@ -187,7 +187,7 @@ One very common pattern in Kafka is to load array elements from a message into a Map or Set for easier access. The message protocol makes this easier with the "mapKey" concept. -If some of the elemements of an array are annotated with "mapKey": true, the +If some of the elements of an array are annotated with "mapKey": true, the entire array will be treated as a linked hash set rather than a list. Elements in this set will be accessible in O(1) time with an automatically generated "find" function. The order of elements in the set will still be preserved, diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 82c5b1d..12b076d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -38,6 +38,7 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.GroupAuthorizationException; import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.InvalidTopicException; @@ -50,6 +51,9 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicDeletionDisabledException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateAclsResponse; @@ -67,6 +71,7 @@ import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.DescribeConfigsResponse; import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.ElectPreferredLeadersResponse; import org.apache.kafka.common.requests.FindCoordinatorResponse; import org.apache.kafka.common.requests.ListGroupsResponse; import org.apache.kafka.common.requests.MetadataRequest; @@ -634,6 +639,55 @@ public class KafkaAdminClientTest { } } + @Test + public void testElectPreferredLeaders() throws Exception { + TopicPartition topic1 = new TopicPartition("topic", 0); + TopicPartition topic2 = new TopicPartition("topic", 2); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); + + // Test a call where one partition has an error. + ApiError value = ApiError.fromThrowable(new ClusterAuthorizationException(null)); + ElectPreferredLeadersResponseData responseData = new ElectPreferredLeadersResponseData(); + ReplicaElectionResult r = new ReplicaElectionResult().setTopic(topic1.topic()); + r.partitionResult().add(new PartitionResult() + .setPartitionId(topic1.partition()) + .setErrorCode(ApiError.NONE.error().code()) + .setErrorMessage(ApiError.NONE.message())); + r.partitionResult().add(new PartitionResult() + .setPartitionId(topic2.partition()) + .setErrorCode(value.error().code()) + .setErrorMessage(value.message())); + responseData.replicaElectionResults().add(r); + env.kafkaClient().prepareResponse(new ElectPreferredLeadersResponse(responseData)); + ElectPreferredLeadersResult results = env.adminClient().electPreferredLeaders(asList(topic1, topic2)); + results.partitionResult(topic1).get(); + TestUtils.assertFutureError(results.partitionResult(topic2), ClusterAuthorizationException.class); + TestUtils.assertFutureError(results.all(), ClusterAuthorizationException.class); + + // Test a call where there are no errors. + r.partitionResult().clear(); + r.partitionResult().add(new PartitionResult() + .setPartitionId(topic1.partition()) + .setErrorCode(ApiError.NONE.error().code()) + .setErrorMessage(ApiError.NONE.message())); + r.partitionResult().add(new PartitionResult() + .setPartitionId(topic2.partition()) + .setErrorCode(ApiError.NONE.error().code()) + .setErrorMessage(ApiError.NONE.message())); + env.kafkaClient().prepareResponse(new ElectPreferredLeadersResponse(responseData)); + + results = env.adminClient().electPreferredLeaders(asList(topic1, topic2)); + results.partitionResult(topic1).get(); + results.partitionResult(topic2).get(); + + // Now try a timeout + results = env.adminClient().electPreferredLeaders(asList(topic1, topic2), new ElectPreferredLeadersOptions().timeoutMs(100)); + TestUtils.assertFutureError(results.partitionResult(topic1), TimeoutException.class); + TestUtils.assertFutureError(results.partitionResult(topic2), TimeoutException.class); + } + } + /** * Test handling timeouts. */ diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java index aa2e683..d721245 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java @@ -327,6 +327,10 @@ public class MockAdminClient extends AdminClient { throw new UnsupportedOperationException("Not implemented yet"); } + public ElectPreferredLeadersResult electPreferredLeaders(Collection<TopicPartition> partitions, ElectPreferredLeadersOptions options) { + throw new UnsupportedOperationException("Not implemented yet"); + } + @Override public CreateAclsResult createAcls(Collection<AclBinding> acls, CreateAclsOptions options) { throw new UnsupportedOperationException("Not implemented yet"); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java index ed50b93..857869f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java @@ -68,7 +68,8 @@ public class RequestContextTest { assertEquals(correlationId, responseHeader.correlationId()); Struct struct = ApiKeys.API_VERSIONS.parseResponse((short) 0, responseBuffer); - ApiVersionsResponse response = (ApiVersionsResponse) AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct); + ApiVersionsResponse response = (ApiVersionsResponse) + AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct, (short) 0); assertEquals(Errors.UNSUPPORTED_VERSION, response.error()); assertTrue(response.apiVersions().isEmpty()); } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index aa98031..2892bb6 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -32,6 +32,11 @@ import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData.TopicPartitions; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.ReplicaElectionResult; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; @@ -304,6 +309,10 @@ public class RequestResponseTest { checkRequest(createRenewTokenRequest()); checkErrorResponse(createRenewTokenRequest(), new UnknownServerException()); checkResponse(createRenewTokenResponse(), 0); + checkRequest(createElectPreferredLeadersRequest()); + checkRequest(createElectPreferredLeadersRequestNullPartitions()); + checkErrorResponse(createElectPreferredLeadersRequest(), new UnknownServerException()); + checkResponse(createElectPreferredLeadersResponse(), 0); } @Test @@ -460,7 +469,7 @@ public class RequestResponseTest { Struct deserializedStruct = ApiKeys.PRODUCE.parseResponse(version, buffer); ProduceResponse v5FromBytes = (ProduceResponse) AbstractResponse.parseResponse(ApiKeys.PRODUCE, - deserializedStruct); + deserializedStruct, version); assertEquals(1, v5FromBytes.responses().size()); assertTrue(v5FromBytes.responses().containsKey(tp0)); @@ -1322,4 +1331,33 @@ public class RequestResponseTest { return new DescribeDelegationTokenResponse(20, Errors.NONE, tokenList); } + + private ElectPreferredLeadersRequest createElectPreferredLeadersRequestNullPartitions() { + return new ElectPreferredLeadersRequest.Builder( + new ElectPreferredLeadersRequestData() + .setTimeoutMs(100) + .setTopicPartitions(null)) + .build((short) 0); + } + + private ElectPreferredLeadersRequest createElectPreferredLeadersRequest() { + ElectPreferredLeadersRequestData data = new ElectPreferredLeadersRequestData() + .setTimeoutMs(100); + data.topicPartitions().add(new TopicPartitions().setTopic("data").setPartitionId(asList(1, 2))); + return new ElectPreferredLeadersRequest.Builder(data).build((short) 0); + } + + private ElectPreferredLeadersResponse createElectPreferredLeadersResponse() { + ElectPreferredLeadersResponseData data = new ElectPreferredLeadersResponseData().setThrottleTimeMs(200); + ReplicaElectionResult resultsByTopic = new ReplicaElectionResult().setTopic("myTopic"); + resultsByTopic.partitionResult().add(new PartitionResult().setPartitionId(0) + .setErrorCode(Errors.NONE.code()) + .setErrorMessage(Errors.NONE.message())); + resultsByTopic.partitionResult().add(new PartitionResult().setPartitionId(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()) + .setErrorMessage(Errors.UNKNOWN_TOPIC_OR_PARTITION.message())); + data.replicaElectionResults().add(resultsByTopic); + return new ElectPreferredLeadersResponse(data); + } + } diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 7bfecde..8740ed4 100755 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -16,12 +16,20 @@ */ package kafka.admin +import java.util.Properties +import java.util.concurrent.ExecutionException + +import joptsimple.OptionSpecBuilder import kafka.common.AdminCommandFailedException import kafka.utils._ import kafka.zk.KafkaZkClient -import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.security.JaasUtils +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.common.errors.TimeoutException + +import collection.JavaConverters._ import org.apache.kafka.common.utils.{Time, Utils} +import org.apache.kafka.common.security.JaasUtils +import org.apache.kafka.common.{KafkaFuture, TopicPartition} import org.apache.zookeeper.KeeperException.NodeExistsException import collection._ @@ -29,33 +37,47 @@ import collection._ object PreferredReplicaLeaderElectionCommand extends Logging { def main(args: Array[String]): Unit = { + + val timeout = 30000 + run(args, timeout) + } + def run(args: Array[String], timeout: Int = 30000): Unit = { val commandOpts = new PreferredReplicaLeaderElectionCommandOptions(args) CommandLineUtils.printHelpAndExitIfNeeded(commandOpts, "This tool helps to causes leadership for each partition to be transferred back to the 'preferred replica'," + " it can be used to balance leadership among the servers.") - CommandLineUtils.checkRequiredArgs(commandOpts.parser, commandOpts.options, commandOpts.zkConnectOpt) + CommandLineUtils.checkRequiredArgs(commandOpts.parser, commandOpts.options) - val zkConnect = commandOpts.options.valueOf(commandOpts.zkConnectOpt) - var zkClient: KafkaZkClient = null - try { - val time = Time.SYSTEM - zkClient = KafkaZkClient(zkConnect, JaasUtils.isZkSecurityEnabled, 30000, 30000, Int.MaxValue, time) + if (commandOpts.options.has(commandOpts.bootstrapServerOpt) == commandOpts.options.has(commandOpts.zkConnectOpt)) { + CommandLineUtils.printUsageAndDie(commandOpts.parser, s"Exactly one of '${commandOpts.bootstrapServerOpt}' or '${commandOpts.zkConnectOpt}' must be provided") + } + + val partitionsForPreferredReplicaElection = + if (commandOpts.options.has(commandOpts.jsonFileOpt)) + Some(parsePreferredReplicaElectionData(Utils.readFileAsString(commandOpts.options.valueOf(commandOpts.jsonFileOpt)))) + else + None - val partitionsForPreferredReplicaElection = - if (!commandOpts.options.has(commandOpts.jsonFileOpt)) - zkClient.getAllPartitions() + val preferredReplicaElectionCommand = if (commandOpts.options.has(commandOpts.zkConnectOpt)) { + println(s"Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.") + println(s"Use --bootstrap-server instead to specify a broker to connect to.") + new ZkCommand(commandOpts.options.valueOf(commandOpts.zkConnectOpt), + JaasUtils.isZkSecurityEnabled, + timeout) + } else { + val adminProps = if (commandOpts.options.has(commandOpts.adminClientConfigOpt)) + Utils.loadProps(commandOpts.options.valueOf(commandOpts.adminClientConfigOpt)) else - parsePreferredReplicaElectionData(Utils.readFileAsString(commandOpts.options.valueOf(commandOpts.jsonFileOpt))) - val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkClient, partitionsForPreferredReplicaElection) + new Properties() + adminProps.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, commandOpts.options.valueOf(commandOpts.bootstrapServerOpt)) + adminProps.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, timeout.toString) + new AdminClientCommand(adminProps) + } - preferredReplicaElectionCommand.moveLeaderToPreferredReplica() - } catch { - case e: Throwable => - println("Failed to start preferred replica election") - println(Utils.stackTrace(e)) + try { + preferredReplicaElectionCommand.electPreferredLeaders(partitionsForPreferredReplicaElection) } finally { - if (zkClient != null) - zkClient.close() + preferredReplicaElectionCommand.close() } } @@ -101,14 +123,165 @@ object PreferredReplicaLeaderElectionCommand extends Logging { .withRequiredArg .describedAs("list of partitions for which preferred replica leader election needs to be triggered") .ofType(classOf[String]) - val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + - "form host:port. Multiple URLS can be given to allow fail-over.") + + private val zookeeperOptBuilder: OptionSpecBuilder = parser.accepts("zookeeper", + "DEPRECATED. The connection string for the zookeeper connection in the " + + "form host:port. Multiple URLS can be given to allow fail-over. " + + "Replaced by --bootstrap-server, REQUIRED unless --bootstrap-server is given.") + private val bootstrapOptBuilder: OptionSpecBuilder = parser.accepts("bootstrap-server", + "A hostname and port for the broker to connect to, " + + "in the form host:port. Multiple comma-separated URLs can be given. REQUIRED unless --zookeeper is given.") + parser.mutuallyExclusive(zookeeperOptBuilder, bootstrapOptBuilder) + val bootstrapServerOpt = bootstrapOptBuilder + .withRequiredArg + .describedAs("host:port") + .ofType(classOf[String]) + val zkConnectOpt = zookeeperOptBuilder .withRequiredArg .describedAs("urls") .ofType(classOf[String]) + val adminClientConfigOpt = parser.accepts("admin.config", + "Admin client config properties file to pass to the admin client when --bootstrap-server is given.") + .availableIf(bootstrapServerOpt) + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) + + parser.accepts("") options = parser.parse(args: _*) } + + /** Abstraction over different ways to perform a leader election */ + trait Command { + /** Elect the preferred leader for the given {@code partitionsForElection}. + * If the given {@code partitionsForElection} are None then elect the preferred leader for all partitions. + */ + def electPreferredLeaders(partitionsForElection: Option[Set[TopicPartition]]) : Unit + def close() : Unit + } + + class ZkCommand(zkConnect: String, isSecure: Boolean, timeout: Int) + extends Command { + var zkClient: KafkaZkClient = null + + val time = Time.SYSTEM + zkClient = KafkaZkClient(zkConnect, isSecure, timeout, timeout, Int.MaxValue, time) + + override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]) { + try { + val topics = + partitionsFromUser match { + case Some(partitions) => + partitions.map(_.topic).toSet + case None => + zkClient.getAllPartitions().map(_.topic) + } + + val partitionsFromZk = zkClient.getPartitionsForTopics(topics).flatMap{ case (topic, partitions) => + partitions.map(new TopicPartition(topic, _)) + }.toSet + + val (validPartitions, invalidPartitions) = + partitionsFromUser match { + case Some(partitions) => + partitions.partition(partitionsFromZk.contains) + case None => + (zkClient.getAllPartitions(), Set.empty) + } + PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkClient, validPartitions) + + println("Successfully started preferred replica election for partitions %s".format(validPartitions)) + invalidPartitions.foreach(p => println("Skipping preferred replica leader election for partition %s since it doesn't exist.".format(p))) + } catch { + case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e) + } + } + + override def close(): Unit = { + if (zkClient != null) + zkClient.close() + } + } + + /** Election via AdminClient.electPreferredLeaders() */ + class AdminClientCommand(adminClientProps: Properties) + extends Command with Logging { + + val adminClient = org.apache.kafka.clients.admin.AdminClient.create(adminClientProps) + + /** + * Wait until the given future has completed, then return whether it completed exceptionally. + * Because KafkaFuture.isCompletedExceptionally doesn't wait for a result + */ + private def completedExceptionally[T](future: KafkaFuture[T]): Boolean = { + try { + future.get() + false + } catch { + case (_: Throwable) => + true + } + } + + override def electPreferredLeaders(partitionsFromUser: Option[Set[TopicPartition]]): Unit = { + val partitions = partitionsFromUser match { + case Some(partitionsFromUser) => partitionsFromUser.asJava + case None => null + } + debug(s"Calling AdminClient.electPreferredLeaders($partitions)") + val result = adminClient.electPreferredLeaders(partitions) + // wait for all results + + val attemptedPartitions = partitionsFromUser match { + case Some(partitionsFromUser) => partitions.asScala + case None => try { + result.partitions().get.asScala + } catch { + case e: ExecutionException => + val cause = e.getCause + if (cause.isInstanceOf[TimeoutException]) { + // We timed out, or don't even know the attempted partitions + println("Timeout waiting for election results") + } + throw new AdminCommandFailedException(null, cause) + case e: Throwable => + // We don't even know the attempted partitions + println("Error while making request") + e.printStackTrace() + return + } + } + + val (exceptional, ok) = attemptedPartitions.map(tp => tp -> result.partitionResult(tp)). + partition { case (_, partitionResult) => completedExceptionally(partitionResult) } + + if (!ok.isEmpty) { + println(s"Successfully completed preferred replica election for partitions ${ok.map{ case (tp, future) => tp }.mkString(", ")}") + } + if (!exceptional.isEmpty) { + val adminException = new AdminCommandFailedException( + s"${exceptional.size} preferred replica(s) could not be elected") + for ((partition, void) <- exceptional) { + val exception = try { + void.get() + new AdminCommandFailedException("Exceptional future with no exception") + } catch { + case e: ExecutionException => e.getCause + } + println(s"Error completing preferred replica election for partition $partition: $exception") + adminException.addSuppressed(exception) + } + throw adminException + } + + } + + override def close(): Unit = { + debug("Closing AdminClient") + adminClient.close() + } + } } class PreferredReplicaLeaderElectionCommand(zkClient: KafkaZkClient, partitionsFromUser: scala.collection.Set[TopicPartition]) { diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala index c93e9e7..54e3a9e 100644 --- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala @@ -28,6 +28,7 @@ import org.apache.kafka.common.errors.ControllerMovedException import org.apache.kafka.common.utils.Time import scala.collection._ +import scala.collection.JavaConverters._ object ControllerEventManager { val ControllerEventThreadName = "controller-event-thread" @@ -69,6 +70,10 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll } def clearAndPut(event: ControllerEvent): Unit = inLock(putLock) { + queue.asScala.foreach(evt => + if (evt.isInstanceOf[PreemptableControllerEvent]) + evt.asInstanceOf[PreemptableControllerEvent].preempt() + ) queue.clear() put(event) } diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index c8cf446..ea23beb 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -16,6 +16,7 @@ */ package kafka.controller +import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.{CountDownLatch, TimeUnit} import com.yammer.metrics.core.Gauge @@ -32,13 +33,13 @@ import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.errors.{BrokerNotAvailableException, ControllerMovedException, StaleBrokerEpochException} import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, LeaderAndIsrResponse, StopReplicaResponse} +import org.apache.kafka.common.requests.{AbstractControlRequest, AbstractResponse, ApiError, LeaderAndIsrResponse, StopReplicaResponse} import org.apache.kafka.common.utils.Time import org.apache.zookeeper.KeeperException import org.apache.zookeeper.KeeperException.Code import scala.collection._ -import scala.util.Try +import scala.util.{Failure, Try} object KafkaController extends Logging { val InitialControllerEpoch = 0 @@ -268,7 +269,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti maybeTriggerPartitionReassignment(controllerContext.partitionsBeingReassigned.keySet) topicDeletionManager.tryTopicDeletion() val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections() - onPreferredReplicaElection(pendingPreferredReplicaElections) + onPreferredReplicaElection(pendingPreferredReplicaElections, ZkTriggered) info("Starting the controller scheduler") kafkaScheduler.startup() if (config.autoLeaderRebalanceEnable) { @@ -586,7 +587,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti val partitionsToBeRemovedFromReassignment = scala.collection.mutable.Set.empty[TopicPartition] topicPartitions.foreach { tp => if (topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic)) { - error(s"Skipping reassignment of $tp since the topic is currently being deleted") + info(s"Skipping reassignment of $tp since the topic is currently being deleted") partitionsToBeRemovedFromReassignment.add(tp) } else { val reassignedPartitionContext = controllerContext.partitionsBeingReassigned.get(tp).getOrElse { @@ -628,17 +629,38 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti removePartitionsFromReassignedPartitions(partitionsToBeRemovedFromReassignment) } - private def onPreferredReplicaElection(partitions: Set[TopicPartition], isTriggeredByAutoRebalance: Boolean = false) { + sealed trait ElectionType + object AutoTriggered extends ElectionType + object ZkTriggered extends ElectionType + object AdminClientTriggered extends ElectionType + + /** + * Attempt to elect the preferred replica as leader for each of the given partitions. + * @param partitions The partitions to have their preferred leader elected + * @param electionType The election type + * @return A map of failed elections where keys are partitions which had an error and the corresponding value is + * the exception that was thrown. + */ + private def onPreferredReplicaElection(partitions: Set[TopicPartition], + electionType: ElectionType): Map[TopicPartition, Throwable] = { info(s"Starting preferred replica leader election for partitions ${partitions.mkString(",")}") try { - partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, Option(PreferredReplicaPartitionLeaderElectionStrategy)) - } catch { - case e: ControllerMovedException => - error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")} because controller has moved to another broker.", e) - throw e - case e: Throwable => error(s"Error completing preferred replica leader election for partitions ${partitions.mkString(",")}", e) + val results = partitionStateMachine.handleStateChanges(partitions.toSeq, OnlinePartition, + Option(PreferredReplicaPartitionLeaderElectionStrategy)) + if (electionType != AdminClientTriggered) { + results.foreach { case (tp, throwable) => + if (throwable.isInstanceOf[ControllerMovedException]) { + error(s"Error completing preferred replica leader election for partition $tp because controller has moved to another broker.", throwable) + throw throwable + } else { + error(s"Error completing preferred replica leader election for partition $tp", throwable) + } + } + } + return results; } finally { - removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance) + if (electionType != AdminClientTriggered) + removePartitionsFromPreferredReplicaElection(partitions, electionType == AutoTriggered) } } @@ -884,7 +906,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti if (!isTriggeredByAutoRebalance) { zkClient.deletePreferredReplicaElection(controllerContext.epochZkVersion) // Ensure we detect future preferred replica leader elections - eventManager.put(PreferredReplicaLeaderElection) + eventManager.put(PreferredReplicaLeaderElection(None)) } } @@ -983,7 +1005,7 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti controllerContext.partitionsBeingReassigned.isEmpty && !topicDeletionManager.isTopicQueuedUpForDeletion(tp.topic) && controllerContext.allTopics.contains(tp.topic)) - onPreferredReplicaElection(candidatePartitions.toSet, isTriggeredByAutoRebalance = true) + onPreferredReplicaElection(candidatePartitions.toSet, AutoTriggered) } } } @@ -1022,11 +1044,15 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } - case class ControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends ControllerEvent { + case class ControlledShutdown(id: Int, brokerEpoch: Long, controlledShutdownCallback: Try[Set[TopicPartition]] => Unit) extends PreemptableControllerEvent { def state = ControllerState.ControlledShutdown - override def process(): Unit = { + override def handlePreempt(): Unit = { + controlledShutdownCallback(Failure(new ControllerMovedException("Controller moved to another broker"))) + } + + override def handleProcess(): Unit = { val controlledShutdownResult = Try { doControlledShutdown(id) } controlledShutdownCallback(controlledShutdownResult) } @@ -1517,22 +1543,75 @@ class KafkaController(val config: KafkaConfig, zkClient: KafkaZkClient, time: Ti } } - case object PreferredReplicaLeaderElection extends ControllerEvent { + type ElectPreferredLeadersCallback = (Map[TopicPartition, Int], Map[TopicPartition, ApiError])=>Unit + + def electPreferredLeaders(partitions: Set[TopicPartition], callback: ElectPreferredLeadersCallback = { (_,_) => }): Unit = + eventManager.put(PreferredReplicaLeaderElection(Some(partitions), AdminClientTriggered, callback)) + + case class PreferredReplicaLeaderElection(partitionsFromAdminClientOpt: Option[Set[TopicPartition]], + electionType: ElectionType = ZkTriggered, + callback: ElectPreferredLeadersCallback = (_,_) =>{}) extends PreemptableControllerEvent { override def state: ControllerState = ControllerState.ManualLeaderBalance - override def process(): Unit = { - if (!isActive) return + override def handlePreempt(): Unit = { + callback(Map.empty, partitionsFromAdminClientOpt match { + case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap + case None => Map.empty + }) + } + + override def handleProcess(): Unit = { + if (!isActive) { + callback(Map.empty, partitionsFromAdminClientOpt match { + case Some(partitions) => partitions.map(partition => partition -> new ApiError(Errors.NOT_CONTROLLER, null)).toMap + case None => Map.empty + }) + } else { + // We need to register the watcher if the path doesn't exist in order to detect future preferred replica + // leader elections and we get the `path exists` check for free + if (electionType == AdminClientTriggered || zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) { + val partitions = partitionsFromAdminClientOpt match { + case Some(partitions) => partitions + case None => zkClient.getPreferredReplicaElection + } + + val (validPartitions, invalidPartitions) = partitions.partition(tp => controllerContext.allPartitions.contains(tp)) + invalidPartitions.foreach { p => + info(s"Skipping preferred replica leader election for partition ${p} since it doesn't exist.") + } - // We need to register the watcher if the path doesn't exist in order to detect future preferred replica - // leader elections and we get the `path exists` check for free - if (zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) { - val partitions = zkClient.getPreferredReplicaElection - val partitionsForTopicsToBeDeleted = partitions.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic)) - if (partitionsForTopicsToBeDeleted.nonEmpty) { - error(s"Skipping preferred replica election for partitions $partitionsForTopicsToBeDeleted since the " + - "respective topics are being deleted") + val (partitionsBeingDeleted, livePartitions) = validPartitions.partition(partition => + topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic)) + if (partitionsBeingDeleted.nonEmpty) { + warn(s"Skipping preferred replica election for partitions $partitionsBeingDeleted " + + s"since the respective topics are being deleted") + } + // partition those where preferred is already leader + val (electablePartitions, alreadyPreferred) = livePartitions.partition { partition => + val assignedReplicas = controllerContext.partitionReplicaAssignment(partition) + val preferredReplica = assignedReplicas.head + val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader + currentLeader != preferredReplica + } + + val electionErrors = onPreferredReplicaElection(electablePartitions, electionType) + val successfulPartitions = electablePartitions -- electionErrors.keySet + val results = electionErrors.map { case (partition, ex) => + val apiError = if (ex.isInstanceOf[StateChangeFailedException]) + new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE, ex.getMessage) + else + ApiError.fromThrowable(ex) + partition -> apiError + } ++ + alreadyPreferred.map(_ -> ApiError.NONE) ++ + partitionsBeingDeleted.map(_ -> new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted")) ++ + invalidPartitions.map ( tp => tp -> new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, s"The partition does not exist.") + ) + debug(s"PreferredReplicaLeaderElection waiting: $successfulPartitions, results: $results") + callback(successfulPartitions.map( + tp => tp->controllerContext.partitionReplicaAssignment(tp).head).toMap, + results) } - onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted) } } } @@ -1653,7 +1732,7 @@ object IsrChangeNotificationHandler { class PreferredReplicaElectionHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler { override val path: String = PreferredReplicaElectionZNode.path - override def handleCreation(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection) + override def handleCreation(): Unit = eventManager.put(controller.PreferredReplicaLeaderElection(None)) } class ControllerChangeHandler(controller: KafkaController, eventManager: ControllerEventManager) extends ZNodeChangeHandler { @@ -1712,3 +1791,25 @@ sealed trait ControllerEvent { def state: ControllerState def process(): Unit } + +/** + * A `ControllerEvent`, such as one with a client callback, which needs specific handling in the event of ZK session expiration. + */ +sealed trait PreemptableControllerEvent extends ControllerEvent { + + val spent = new AtomicBoolean(false) + + final def preempt(): Unit = { + if (!spent.getAndSet(true)) + handlePreempt() + } + + final def process(): Unit = { + if (!spent.getAndSet(true)) + handleProcess() + } + + def handlePreempt(): Unit + + def handleProcess(): Unit +} diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index e4f0532..ad73979 100755 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -125,22 +125,36 @@ class PartitionStateMachine(config: KafkaConfig, // It is important to trigger leader election for those partitions. } + /** + * Try to change the state of the given partitions to the given targetState, using the given + * partitionLeaderElectionStrategyOpt if a leader election is required. + * @param partitions The partitions + * @param targetState The state + * @param partitionLeaderElectionStrategyOpt The leader election strategy if a leader election is required. + * @return partitions and corresponding throwable for those partitions which could not transition to the given state + */ def handleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState, - partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Unit = { + partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy] = None): Map[TopicPartition, Throwable] = { if (partitions.nonEmpty) { try { controllerBrokerRequestBatch.newBatch() - doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt) + val errors = doHandleStateChanges(partitions, targetState, partitionLeaderElectionStrategyOpt) controllerBrokerRequestBatch.sendRequestsToBrokers(controllerContext.epoch) + errors } catch { case e: ControllerMovedException => error(s"Controller moved to another broker when moving some partitions to $targetState state", e) throw e - case e: Throwable => error(s"Error while moving some partitions to $targetState state", e) + case e: Throwable => + error(s"Error while moving some partitions to $targetState state", e) + partitions.map { _ -> e }.toMap } + } else { + Map.empty[TopicPartition, Throwable] } } + def partitionsInState(state: PartitionState): Set[TopicPartition] = { partitionState.filter { case (_, s) => s == state }.keySet.toSet } @@ -183,7 +197,7 @@ class PartitionStateMachine(config: KafkaConfig, * @param targetState The end state that the partition should be moved to */ private def doHandleStateChanges(partitions: Seq[TopicPartition], targetState: PartitionState, - partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Unit = { + partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]): Map[TopicPartition, Throwable] = { val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch) partitions.foreach(partition => partitionState.getOrElseUpdate(partition, NonExistentPartition)) val (validPartitions, invalidPartitions) = partitions.partition(partition => isValidTransition(partition, targetState)) @@ -195,6 +209,7 @@ class PartitionStateMachine(config: KafkaConfig, s"assigned replicas ${controllerContext.partitionReplicaAssignment(partition).mkString(",")}") changeStateTo(partition, partitionState(partition), NewPartition) } + Map.empty case OnlinePartition => val uninitializedPartitions = validPartitions.filter(partition => partitionState(partition) == NewPartition) val partitionsToElectLeader = validPartitions.filter(partition => partitionState(partition) == OfflinePartition || partitionState(partition) == OnlinePartition) @@ -207,23 +222,28 @@ class PartitionStateMachine(config: KafkaConfig, } } if (partitionsToElectLeader.nonEmpty) { - val successfulElections = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get) + val (successfulElections, failedElections) = electLeaderForPartitions(partitionsToElectLeader, partitionLeaderElectionStrategyOpt.get) successfulElections.foreach { partition => stateChangeLog.trace(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " + s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}") changeStateTo(partition, partitionState(partition), OnlinePartition) } + failedElections + } else { + Map.empty } case OfflinePartition => validPartitions.foreach { partition => stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState") changeStateTo(partition, partitionState(partition), OfflinePartition) } + Map.empty case NonExistentPartition => validPartitions.foreach { partition => stateChangeLog.trace(s"Changed partition $partition state from ${partitionState(partition)} to $targetState") changeStateTo(partition, partitionState(partition), NonExistentPartition) } + Map.empty } } @@ -283,11 +303,14 @@ class PartitionStateMachine(config: KafkaConfig, * Repeatedly attempt to elect leaders for multiple partitions until there are no more remaining partitions to retry. * @param partitions The partitions that we're trying to elect leaders for. * @param partitionLeaderElectionStrategy The election strategy to use. - * @return The partitions that successfully had a leader elected. + * @return A pair with first element of which is the partitions that successfully had a leader elected + * and the second element a map of failed partition to the corresponding thrown exception. */ - private def electLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): Seq[TopicPartition] = { + private def electLeaderForPartitions(partitions: Seq[TopicPartition], + partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy): (Seq[TopicPartition], Map[TopicPartition, Throwable]) = { val successfulElections = mutable.Buffer.empty[TopicPartition] var remaining = partitions + var failures = Map.empty[TopicPartition, Throwable] while (remaining.nonEmpty) { val (success, updatesToRetry, failedElections) = doElectLeaderForPartitions(partitions, partitionLeaderElectionStrategy) remaining = updatesToRetry @@ -295,8 +318,9 @@ class PartitionStateMachine(config: KafkaConfig, failedElections.foreach { case (partition, e) => logFailedStateChange(partition, partitionState(partition), OnlinePartition, e) } + failures ++= failedElections } - successfulElections + (successfulElections, failures) } /** diff --git a/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala b/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala new file mode 100644 index 0000000..38b07ad --- /dev/null +++ b/core/src/main/scala/kafka/server/DelayedElectPreferredLeader.scala @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.ApiError + +import scala.collection.{Map, mutable} + +/** A delayed elect preferred leader operation that can be created by the replica manager and watched + * in the elect preferred leader purgatory + */ +class DelayedElectPreferredLeader(delayMs: Long, + expectedLeaders: Map[TopicPartition, Int], + results: Map[TopicPartition, ApiError], + replicaManager: ReplicaManager, + responseCallback: Map[TopicPartition, ApiError] => Unit) + extends DelayedOperation(delayMs) { + + var waitingPartitions = expectedLeaders + val fullResults = results.to[mutable.Set] + + + /** + * Call-back to execute when a delayed operation gets expired and hence forced to complete. + */ + override def onExpiration(): Unit = {} + + /** + * Process for completing an operation; This function needs to be defined + * in subclasses and will be called exactly once in forceComplete() + */ + override def onComplete(): Unit = { + // This could be called to force complete, so I need the full list of partitions, so I can time them all out. + updateWaiting() + val timedout = waitingPartitions.map{ + case (tp, leader) => tp -> new ApiError(Errors.REQUEST_TIMED_OUT, null) + }.toMap + responseCallback(timedout ++ fullResults) + } + + private def timeoutWaiting = { + waitingPartitions.map(partition => partition -> new ApiError(Errors.REQUEST_TIMED_OUT, null)).toMap + } + + /** + * Try to complete the delayed operation by first checking if the operation + * can be completed by now. If yes execute the completion logic by calling + * forceComplete() and return true iff forceComplete returns true; otherwise return false + * + * This function needs to be defined in subclasses + */ + override def tryComplete(): Boolean = { + updateWaiting() + debug(s"tryComplete() waitingPartitions: $waitingPartitions") + waitingPartitions.isEmpty && forceComplete() + } + + private def updateWaiting() = { + waitingPartitions.foreach{case (tp, leader) => + val ps = replicaManager.metadataCache.getPartitionInfo(tp.topic, tp.partition) + ps match { + case Some(ps) => + if (leader == ps.basePartitionState.leader) { + waitingPartitions -= tp + fullResults += tp -> ApiError.NONE + } + case None => + } + } + } + +} diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 67020a8..aab9a38 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -43,6 +43,7 @@ import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.FatalExitError import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANSACTION_STATE_TOPIC_NAME, isInternal} +import org.apache.kafka.common.message.ElectPreferredLeadersResponseData import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ListenerName, Send} import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -146,6 +147,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.EXPIRE_DELEGATION_TOKEN => handleExpireTokenRequest(request) case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request) case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request) + case ApiKeys.ELECT_PREFERRED_LEADERS => handleElectPreferredReplicaLeader(request) } } catch { case e: FatalExitError => throw e @@ -253,6 +255,11 @@ class KafkaApis(val requestChannel: RequestChannel, quotas.request.updateQuotaMetricConfigs() } } + if (replicaManager.hasDelayedElectionOperations) { + updateMetadataRequest.partitionStates.asScala.foreach { case (tp, ps) => + replicaManager.tryCompleteElection(new TopicPartitionOperationKey(tp.topic(), tp.partition())) + } + } sendResponseExemptThrottle(request, new UpdateMetadataResponse(Errors.NONE)) } } @@ -2227,6 +2234,48 @@ class KafkaApis(val requestChannel: RequestChannel, true } + def handleElectPreferredReplicaLeader(request: RequestChannel.Request): Unit = { + + val electionRequest = request.body[ElectPreferredLeadersRequest] + val partitions = + if (electionRequest.data().topicPartitions() == null) { + metadataCache.getAllPartitions() + } else { + electionRequest.data().topicPartitions().asScala.flatMap{tp => + tp.partitionId().asScala.map(partitionId => new TopicPartition(tp.topic, partitionId))}.toSet + } + def sendResponseCallback(result: Map[TopicPartition, ApiError]): Unit = { + sendResponseMaybeThrottle(request, requestThrottleMs => { + val results = result. + groupBy{case (tp, error) => tp.topic}. + map{case (topic, ps) => new ElectPreferredLeadersResponseData.ReplicaElectionResult() + .setTopic(topic) + .setPartitionResult(ps.map{ + case (tp, error) => + new ElectPreferredLeadersResponseData.PartitionResult() + .setErrorCode(error.error.code) + .setErrorMessage(error.message()) + .setPartitionId(tp.partition)}.toList.asJava)} + val data = new ElectPreferredLeadersResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setReplicaElectionResults(results.toList.asJava) + new ElectPreferredLeadersResponse(data)}) + } + if (!authorize(request.session, Alter, Resource.ClusterResource)) { + val error = new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null); + val partitionErrors = + if (electionRequest.data().topicPartitions() == null) { + // Don't leak the set of partitions if the client lack authz + Map.empty[TopicPartition, ApiError] + } else { + partitions.map(partition => partition -> error).toMap + } + sendResponseCallback(partitionErrors) + } else { + replicaManager.electPreferredLeaders(controller, partitions, sendResponseCallback, electionRequest.data().timeoutMs()) + } + } + def authorizeClusterAction(request: RequestChannel.Request): Unit = { if (!isAuthorizedClusterAction(request)) throw new ClusterAuthorizationException(s"Request $request is not authorized.") diff --git a/core/src/main/scala/kafka/server/MetadataCache.scala b/core/src/main/scala/kafka/server/MetadataCache.scala index 3fefc7b..ec5a2b9 100755 --- a/core/src/main/scala/kafka/server/MetadataCache.scala +++ b/core/src/main/scala/kafka/server/MetadataCache.scala @@ -137,6 +137,12 @@ class MetadataCache(brokerId: Int) extends Logging { getAllTopics(metadataSnapshot) } + def getAllPartitions(): Set[TopicPartition] = { + metadataSnapshot.partitionStates.flatMap { case (topicName, partitionsAndStates) => + partitionsAndStates.keys.map(partitionId => new TopicPartition(topicName, partitionId.toInt)) + }.toSet + } + private def getAllTopics(snapshot: MetadataSnapshot): Set[String] = { snapshot.partitionStates.keySet } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5dfb2e6..5e41e35 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -146,6 +146,7 @@ class ReplicaManager(val config: KafkaConfig, val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce], val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch], val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords], + val delayedElectPreferredLeaderPurgatory: DelayedOperationPurgatory[DelayedElectPreferredLeader], threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup { def this(config: KafkaConfig, @@ -171,6 +172,8 @@ class ReplicaManager(val config: KafkaConfig, DelayedOperationPurgatory[DelayedDeleteRecords]( purgatoryName = "DeleteRecords", brokerId = config.brokerId, purgeInterval = config.deleteRecordsPurgatoryPurgeIntervalRequests), + DelayedOperationPurgatory[DelayedElectPreferredLeader]( + purgatoryName = "ElectPreferredLeader", brokerId = config.brokerId), threadNamePrefix) } @@ -318,6 +321,13 @@ class ReplicaManager(val config: KafkaConfig, debug("Request key %s unblocked %d DeleteRecordsRequest.".format(key.keyLabel, completed)) } + def hasDelayedElectionOperations = delayedElectPreferredLeaderPurgatory.delayed != 0 + + def tryCompleteElection(key: DelayedOperationKey): Unit = { + val completed = delayedElectPreferredLeaderPurgatory.checkAndComplete(key) + debug("Request key %s unblocked %d ElectPreferredLeader.".format(key.keyLabel, completed)) + } + def startup() { // start ISR expiration thread // A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR @@ -1476,6 +1486,7 @@ class ReplicaManager(val config: KafkaConfig, delayedFetchPurgatory.shutdown() delayedProducePurgatory.shutdown() delayedDeleteRecordsPurgatory.shutdown() + delayedElectPreferredLeaderPurgatory.shutdown() if (checkpointHW) checkpointHighWatermarks() info("Shut down completely") @@ -1508,4 +1519,30 @@ class ReplicaManager(val config: KafkaConfig, tp -> epochEndOffset } } + + def electPreferredLeaders(controller: KafkaController, + partitions: Set[TopicPartition], + responseCallback: Map[TopicPartition, ApiError] => Unit, + requestTimeout: Long): Unit = { + + val deadline = time.milliseconds() + requestTimeout + + def electionCallback(expectedLeaders: Map[TopicPartition, Int], + results: Map[TopicPartition, ApiError]): Unit = { + if (expectedLeaders.nonEmpty) { + val watchKeys = expectedLeaders.map{ + case (tp, leader) => new TopicPartitionOperationKey(tp.topic, tp.partition) + }.toSeq + delayedElectPreferredLeaderPurgatory.tryCompleteElseWatch( + new DelayedElectPreferredLeader(deadline - time.milliseconds(), expectedLeaders, results, + this, responseCallback), + watchKeys) + } else { + // There are no partitions actually being elected, so return immediately + responseCallback(results) + } + } + + controller.electPreferredLeaders(partitions, electionCallback) + } } diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index 92d1758..5a3278c 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -47,13 +47,14 @@ import org.junit.Assert._ import scala.util.Random import scala.collection.JavaConverters._ -import java.lang.{Long => JLong} import kafka.zk.KafkaZkClient import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} +import java.lang.{Long => JLong} + /** * An integration test of the KafkaAdminClient. * @@ -99,6 +100,8 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, s"${listenerName.value}:${securityProtocol.name}") config.setProperty(KafkaConfig.DeleteTopicEnableProp, "true") config.setProperty(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") + config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, "false") + config.setProperty(KafkaConfig.ControlledShutdownEnableProp, "false") // We set this in order to test that we don't expose sensitive data via describe configs. This will already be // set for subclasses with security enabled and we don't want to overwrite it. if (!config.containsKey(KafkaConfig.SslTruststorePasswordProp)) @@ -1198,6 +1201,204 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { } } + @Test + def testElectPreferredLeaders(): Unit = { + client = AdminClient.create(createConfig) + + val prefer0 = Seq(0, 1, 2) + val prefer1 = Seq(1, 2, 0) + val prefer2 = Seq(2, 0, 1) + + val partition1 = new TopicPartition("elect-preferred-leaders-topic-1", 0) + TestUtils.createTopic(zkClient, partition1.topic, Map[Int, Seq[Int]](partition1.partition -> prefer0), servers) + + val partition2 = new TopicPartition("elect-preferred-leaders-topic-2", 0) + TestUtils.createTopic(zkClient, partition2.topic, Map[Int, Seq[Int]](partition2.partition -> prefer0), servers) + + def currentLeader(topicPartition: TopicPartition) = + client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic). + get.partitions.get(topicPartition.partition).leader.id + + def preferredLeader(topicPartition: TopicPartition) = + client.describeTopics(asList(topicPartition.topic)).values.get(topicPartition.topic). + get.partitions.get(topicPartition.partition).replicas.get(0).id + + def waitForLeaderToBecome(topicPartition: TopicPartition, leader: Int) = + TestUtils.waitUntilTrue(() => currentLeader(topicPartition) == leader, s"Expected leader to become $leader", 10000) + + /** Changes the <i>preferred</i> leader without changing the <i>current</i> leader. */ + def changePreferredLeader(newAssignment: Seq[Int]) = { + val preferred = newAssignment.head + val prior1 = currentLeader(partition1) + val prior2 = currentLeader(partition2) + + var m = Map.empty[TopicPartition, Seq[Int]] + + if (prior1 != preferred) + m += partition1 -> newAssignment + if (prior2 != preferred) + m += partition2 -> newAssignment + + zkClient.createPartitionReassignment(m) + TestUtils.waitUntilTrue( + () => preferredLeader(partition1) == preferred && preferredLeader(partition2) == preferred, + s"Expected preferred leader to become $preferred, but is ${preferredLeader(partition1)} and ${preferredLeader(partition2)}", 10000) + // Check the leader hasn't moved + assertEquals(prior1, currentLeader(partition1)) + assertEquals(prior2, currentLeader(partition2)) + } + + // Check current leaders are 0 + assertEquals(0, currentLeader(partition1)) + assertEquals(0, currentLeader(partition2)) + + // Noop election + var electResult = client.electPreferredLeaders(asList(partition1)) + electResult.partitionResult(partition1).get() + assertEquals(0, currentLeader(partition1)) + + // Noop election with null partitions + electResult = client.electPreferredLeaders(null) + electResult.partitionResult(partition1).get() + assertEquals(0, currentLeader(partition1)) + electResult.partitionResult(partition2).get() + assertEquals(0, currentLeader(partition2)) + + // Now change the preferred leader to 1 + changePreferredLeader(prefer1) + + // meaningful election + electResult = client.electPreferredLeaders(asList(partition1)) + assertEquals(Set(partition1).asJava, electResult.partitions.get) + electResult.partitionResult(partition1).get() + waitForLeaderToBecome(partition1, 1) + + // topic 2 unchanged + try { + electResult.partitionResult(partition2).get() + fail("topic 2 wasn't requested") + } catch { + case e: ExecutionException => + val cause = e.getCause + assertTrue(cause.getClass.getName, cause.isInstanceOf[UnknownTopicOrPartitionException]) + assertEquals("Preferred leader election for partition \"elect-preferred-leaders-topic-2-0\" was not attempted", + cause.getMessage) + assertEquals(0, currentLeader(partition2)) + } + + // meaningful election with null partitions + electResult = client.electPreferredLeaders(null) + assertEquals(Set(partition1, partition2), electResult.partitions.get.asScala.filterNot(_.topic == "__consumer_offsets")) + electResult.partitionResult(partition1).get() + waitForLeaderToBecome(partition1, 1) + electResult.partitionResult(partition2).get() + waitForLeaderToBecome(partition2, 1) + + // unknown topic + val unknownPartition = new TopicPartition("topic-does-not-exist", 0) + electResult = client.electPreferredLeaders(asList(unknownPartition)) + assertEquals(Set(unknownPartition).asJava, electResult.partitions.get) + try { + electResult.partitionResult(unknownPartition).get() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException]) + assertEquals("The partition does not exist.", + cause.getMessage) + assertEquals(1, currentLeader(partition1)) + assertEquals(1, currentLeader(partition2)) + } + + // Now change the preferred leader to 2 + changePreferredLeader(prefer2) + + // mixed results + electResult = client.electPreferredLeaders(asList(unknownPartition, partition1)) + assertEquals(Set(unknownPartition, partition1).asJava, electResult.partitions.get) + waitForLeaderToBecome(partition1, 2) + assertEquals(1, currentLeader(partition2)) + try { + electResult.partitionResult(unknownPartition).get() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.isInstanceOf[UnknownTopicOrPartitionException]) + assertEquals("The partition does not exist.", + cause.getMessage) + } + + // dupe partitions + electResult = client.electPreferredLeaders(asList(partition2, partition2)) + assertEquals(Set(partition2).asJava, electResult.partitions.get) + electResult.partitionResult(partition2).get() + waitForLeaderToBecome(partition2, 2) + + // Now change the preferred leader to 1 + changePreferredLeader(prefer1) + // but shut it down... + servers(1).shutdown() + waitUntilTrue ( + () => { + val description = client.describeTopics(Set (partition1.topic(), partition2.topic()).asJava).all().get() + return !description.asScala.flatMap{ + case (topic, description) => description.partitions().asScala.map( + partition => partition.isr().asScala).flatten + }.exists(node => node.id == 1) + }, + "Expect broker 1 to no longer be in any ISR" + ) + + // ... now what happens if we try to elect the preferred leader and it's down? + val shortTimeout = new ElectPreferredLeadersOptions().timeoutMs(10000) + electResult = client.electPreferredLeaders(asList(partition1), shortTimeout) + assertEquals(Set(partition1).asJava, electResult.partitions.get) + try { + electResult.partitionResult(partition1).get() + fail() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) + assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains( + "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) + } + assertEquals(2, currentLeader(partition1)) + + // preferred leader unavailable with null argument + electResult = client.electPreferredLeaders(null, shortTimeout) + try { + electResult.partitions.get() + fail() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) + } + try { + electResult.partitionResult(partition1).get() + fail() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) + assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains( + "Failed to elect leader for partition elect-preferred-leaders-topic-1-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) + } + try { + electResult.partitionResult(partition2).get() + fail() + } catch { + case e: Exception => + val cause = e.getCause + assertTrue(cause.getClass.getName, cause.isInstanceOf[LeaderNotAvailableException]) + assertTrue(s"Wrong message ${cause.getMessage}", cause.getMessage.contains( + "Failed to elect leader for partition elect-preferred-leaders-topic-2-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) + } + + assertEquals(2, currentLeader(partition1)) + assertEquals(2, currentLeader(partition2)) + } } object AdminClientIntegrationTest { diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 4c0459e..ad7fdbb 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -146,7 +146,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse], ApiKeys.ALTER_REPLICA_LOG_DIRS -> classOf[AlterReplicaLogDirsResponse], ApiKeys.DESCRIBE_LOG_DIRS -> classOf[DescribeLogDirsResponse], - ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse] + ApiKeys.CREATE_PARTITIONS -> classOf[CreatePartitionsResponse], + ApiKeys.ELECT_PREFERRED_LEADERS -> classOf[ElectPreferredLeadersResponse] ) val requestKeyToError = Map[ApiKeys, Nothing => Errors]( @@ -187,7 +188,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ALTER_REPLICA_LOG_DIRS -> ((resp: AlterReplicaLogDirsResponse) => resp.responses.get(tp)), ApiKeys.DESCRIBE_LOG_DIRS -> ((resp: DescribeLogDirsResponse) => if (resp.logDirInfos.size() > 0) resp.logDirInfos.asScala.head._2.error else Errors.CLUSTER_AUTHORIZATION_FAILED), - ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error) + ApiKeys.CREATE_PARTITIONS -> ((resp: CreatePartitionsResponse) => resp.errors.asScala.find(_._1 == topic).get._2.error), + ApiKeys.ELECT_PREFERRED_LEADERS -> ((resp: ElectPreferredLeadersResponse) => + ElectPreferredLeadersRequest.fromResponseData(resp.data()).get(tp).error()) ) val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( @@ -225,7 +228,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.DELETE_ACLS -> clusterAlterAcl, ApiKeys.ALTER_REPLICA_LOG_DIRS -> clusterAlterAcl, ApiKeys.DESCRIBE_LOG_DIRS -> clusterDescribeAcl, - ApiKeys.CREATE_PARTITIONS -> topicAlterAcl + ApiKeys.CREATE_PARTITIONS -> topicAlterAcl, + ApiKeys.ELECT_PREFERRED_LEADERS -> clusterAlterAcl ) @Before @@ -382,6 +386,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { private def addOffsetsToTxnRequest = new AddOffsetsToTxnRequest.Builder(transactionalId, 1, 1, group).build() + private def electPreferredLeadersRequest = new ElectPreferredLeadersRequest.Builder( + ElectPreferredLeadersRequest.toRequestData(Collections.singleton(tp), 10000)).build() + @Test def testAuthorizationWithTopicExisting() { val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest]( @@ -414,9 +421,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest, ApiKeys.ADD_PARTITIONS_TO_TXN -> addPartitionsToTxnRequest, ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest, - // Check StopReplica last since some APIs depend on replica availability - ApiKeys.STOP_REPLICA -> stopReplicaRequest + ApiKeys.STOP_REPLICA -> stopReplicaRequest, + ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest ) for ((key, request) <- requestKeyToRequest) { @@ -462,7 +469,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ADD_OFFSETS_TO_TXN -> addOffsetsToTxnRequest, ApiKeys.CREATE_PARTITIONS -> createPartitionsRequest, ApiKeys.DELETE_GROUPS -> deleteGroupsRequest, - ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest + ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest, + ApiKeys.ELECT_PREFERRED_LEADERS -> electPreferredLeadersRequest ) for ((key, request) <- requestKeyToRequest) { diff --git a/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala new file mode 100644 index 0000000..824e8fb --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/PreferredReplicaLeaderElectionCommandTest.scala @@ -0,0 +1,337 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.admin + +import java.io.File +import java.nio.charset.StandardCharsets +import java.nio.file.{Files, Paths} +import java.util.Properties + +import kafka.admin.PreferredReplicaLeaderElectionCommand +import kafka.common.{AdminCommandFailedException, TopicAndPartition} +import kafka.network.RequestChannel +import kafka.security.auth._ +import kafka.server.{KafkaConfig, KafkaServer} +import kafka.utils.{Logging, TestUtils, ZkUtils} +import kafka.zk.ZooKeeperTestHarness +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.{ClusterAuthorizationException, PreferredLeaderNotAvailableException, TimeoutException, UnknownTopicOrPartitionException} +import org.apache.kafka.common.network.ListenerName +import org.junit.Assert._ +import org.junit.{After, Test} + +class PreferredReplicaLeaderElectionCommandTest extends ZooKeeperTestHarness with Logging /*with RackAwareTest*/ { + + var servers: Seq[KafkaServer] = Seq() + + @After + override def tearDown() { + TestUtils.shutdownServers(servers) + super.tearDown() + } + + private def createTestTopicAndCluster(topicPartition: Map[TopicPartition, List[Int]], + authorizer: Option[String] = None) { + + val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false) + brokerConfigs.foreach(p => p.setProperty("auto.leader.rebalance.enable", "false")) + authorizer match { + case Some(className) => + brokerConfigs.foreach(p => p.setProperty("authorizer.class.name", className)) + case None => + } + createTestTopicAndCluster(topicPartition,brokerConfigs) + } + + private def createTestTopicAndCluster(partitionsAndAssignments: Map[TopicPartition, List[Int]], + brokerConfigs: Seq[Properties]) { + // create brokers + servers = brokerConfigs.map(b => TestUtils.createServer(KafkaConfig.fromProps(b))) + // create the topic + partitionsAndAssignments.foreach { case (tp, assigment) => + zkClient.createTopicAssignment(tp.topic(), + Map(tp -> assigment)) + } + // wait until replica log is created on every broker + TestUtils.waitUntilTrue(() => servers.forall(server => partitionsAndAssignments.forall(partitionAndAssignment => server.getLogManager().getLog(partitionAndAssignment._1).isDefined)), + "Replicas for topic test not created") + } + + /** Bounce the given targetServer and wait for all servers to get metadata for the given partition */ + private def bounceServer(targetServer: Int, partition: TopicPartition) { + debug(s"Shutting down server $targetServer so a non-preferred replica becomes leader") + servers(targetServer).shutdown() + debug(s"Starting server $targetServer now that a non-preferred replica is leader") + servers(targetServer).startup() + TestUtils.waitUntilTrue(() => servers.forall { server => + server.metadataCache.getPartitionInfo(partition.topic(), partition.partition()).exists { partitionState => + partitionState.basePartitionState.isr.contains(targetServer) + } + }, + s"Replicas for partition $partition not created") + } + + private def getController() = { + servers.find(p => p.kafkaController.isActive) + } + + private def getLeader(topicPartition: TopicPartition) = { + servers(0).metadataCache.getPartitionInfo(topicPartition.topic(), topicPartition.partition()).get.basePartitionState.leader + } + + private def bootstrapServer(broker: Int = 0): String = { + val port = servers(broker).socketServer.boundPort(ListenerName.normalised("PLAINTEXT")) + debug("Server bound to port "+port) + s"localhost:$port" + } + + val testPartition = new TopicPartition("test", 0) + val testPartitionAssignment = List(1, 2, 0) + val testPartitionPreferredLeader = testPartitionAssignment.head + val testPartitionAndAssignment = Map(testPartition -> testPartitionAssignment) + + /** Test the case multiple values are given for --bootstrap-broker */ + @Test + def testMultipleBrokersGiven() { + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition)) + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", s"${bootstrapServer(1)},${bootstrapServer(0)}")) + // Check the leader for the partition IS the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartition)) + } + + /** Test the case when an invalid broker is given for --bootstrap-broker */ + @Test + def testInvalidBrokerGiven() { + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", "example.com:1234"), + timeout = 1000) + fail() + } catch { + case e: AdminCommandFailedException => + assertTrue(e.getCause.isInstanceOf[TimeoutException]) + } + } + + /** Test the case where no partitions are given (=> elect all partitions) */ + @Test + def testNoPartitionsGiven() { + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition)) + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer())) + // Check the leader for the partition IS the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartition)) + } + + private def toJsonFile(partitions: scala.collection.Set[TopicPartition]): File = { + val jsonFile = File.createTempFile("preferredreplicaelection", ".js") + jsonFile.deleteOnExit() + val jsonString = ZkUtils.preferredReplicaLeaderElectionZkData(partitions.map(new TopicAndPartition(_))) + debug("Using json: "+jsonString) + Files.write(Paths.get(jsonFile.getAbsolutePath), jsonString.getBytes(StandardCharsets.UTF_8)) + jsonFile + } + + /** Test the case where a list of partitions is given */ + @Test + def testSingletonPartitionGiven() { + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition)) + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + } finally { + jsonFile.delete() + } + // Check the leader for the partition IS the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartition)) + } + + /** Test the case where a topic does not exist */ + @Test + def testTopicDoesNotExist() { + val nonExistentPartition = new TopicPartition("does.not.exist", 0) + val nonExistentPartitionAssignment = List(1, 2, 0) + val nonExistentPartitionAndAssignment = Map(nonExistentPartition -> nonExistentPartitionAssignment) + + createTestTopicAndCluster(testPartitionAndAssignment) + val jsonFile = toJsonFile(nonExistentPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + } catch { + case e: AdminCommandFailedException => + val suppressed = e.getSuppressed()(0) + assertTrue(suppressed.isInstanceOf[UnknownTopicOrPartitionException]) + case e: Throwable => + e.printStackTrace() + throw e + } finally { + jsonFile.delete() + } + } + + /** Test the case where several partitions are given */ + @Test + def testMultiplePartitionsSameAssignment() { + val testPartitionA = new TopicPartition("testA", 0) + val testPartitionB = new TopicPartition("testB", 0) + val testPartitionAssignment = List(1, 2, 0) + val testPartitionPreferredLeader = testPartitionAssignment.head + val testPartitionAndAssignment = Map(testPartitionA -> testPartitionAssignment, testPartitionB -> testPartitionAssignment) + + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartitionA) + // Check the leader for the partition is not the preferred one + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionA)) + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartitionB)) + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + } finally { + jsonFile.delete() + } + // Check the leader for the partition IS the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartitionA)) + assertEquals(testPartitionPreferredLeader, getLeader(testPartitionB)) + } + + /** What happens when the preferred replica is already the leader? */ + @Test + def testNoopElection() { + createTestTopicAndCluster(testPartitionAndAssignment) + // Don't bounce the server. Doublec heck the leader for the partition is the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartition)) + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + // Now do the election, even though the preferred replica is *already* the leader + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + // Check the leader for the partition still is the preferred one + assertEquals(testPartitionPreferredLeader, getLeader(testPartition)) + } finally { + jsonFile.delete() + } + } + + /** What happens if the preferred replica is offline? */ + @Test + def testWithOfflinePreferredReplica() { + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + val leader = getLeader(testPartition) + assertNotEquals(testPartitionPreferredLeader, leader) + // Now kill the preferred one + servers(testPartitionPreferredLeader).shutdown() + // Now try to elect the preferred one + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + fail(); + } catch { + case e: AdminCommandFailedException => + assertEquals("1 preferred replica(s) could not be elected", e.getMessage) + val suppressed = e.getSuppressed()(0) + assertTrue(suppressed.isInstanceOf[PreferredLeaderNotAvailableException]) + assertTrue(suppressed.getMessage, suppressed.getMessage.contains("Failed to elect leader for partition test-0 under strategy PreferredReplicaPartitionLeaderElectionStrategy")) + // Check we still have the same leader + assertEquals(leader, getLeader(testPartition)) + } finally { + jsonFile.delete() + } + } + + /** What happens if the controller gets killed just before an election? */ + @Test + def testTimeout() { + createTestTopicAndCluster(testPartitionAndAssignment) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + val leader = getLeader(testPartition) + assertNotEquals(testPartitionPreferredLeader, leader) + // Now kill the controller just before we trigger the election + val controller = getController().get.config.brokerId + servers(controller).shutdown() + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(controller), + "--path-to-json-file", jsonFile.getAbsolutePath), + timeout = 2000) + fail(); + } catch { + case e: AdminCommandFailedException => + assertEquals("1 preferred replica(s) could not be elected", e.getMessage) + assertTrue(e.getSuppressed()(0).getMessage.contains("Timed out waiting for a node assignment")) + // Check we still have the same leader + assertEquals(leader, getLeader(testPartition)) + } finally { + jsonFile.delete() + } + } + + /** Test the case where client is not authorized */ + @Test + def testAuthzFailure() { + createTestTopicAndCluster(testPartitionAndAssignment, Some(classOf[PreferredReplicaLeaderElectionCommandTestAuthorizer].getName)) + bounceServer(testPartitionPreferredLeader, testPartition) + // Check the leader for the partition is not the preferred one + val leader = getLeader(testPartition) + assertNotEquals(testPartitionPreferredLeader, leader) + // Check the leader for the partition is not the preferred one + assertNotEquals(testPartitionPreferredLeader, getLeader(testPartition)) + val jsonFile = toJsonFile(testPartitionAndAssignment.keySet) + try { + PreferredReplicaLeaderElectionCommand.run(Array( + "--bootstrap-server", bootstrapServer(), + "--path-to-json-file", jsonFile.getAbsolutePath)) + fail(); + } catch { + case e: AdminCommandFailedException => + assertEquals("1 preferred replica(s) could not be elected", e.getMessage) + assertTrue(e.getSuppressed()(0).isInstanceOf[ClusterAuthorizationException]) + // Check we still have the same leader + assertEquals(leader, getLeader(testPartition)) + } finally { + jsonFile.delete() + } + } + +} + +class PreferredReplicaLeaderElectionCommandTestAuthorizer extends SimpleAclAuthorizer { + override def authorize(session: RequestChannel.Session, operation: Operation, resource: Resource): Boolean = + operation != Alter || resource.resourceType != Cluster +} diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index d4dcd9f..d5becea 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -158,7 +158,7 @@ object AbstractCoordinatorConcurrencyTest { } class TestReplicaManager extends ReplicaManager( - null, null, null, null, null, null, null, null, null, null, null, null, null, null, None) { + null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, None) { var producePurgatory: DelayedOperationPurgatory[DelayedProduce] = _ var watchKeys: mutable.Set[TopicPartitionOperationKey] = _ diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 9b4210e..a3ecb07 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -524,7 +524,7 @@ class KafkaApisTest { channel.buffer.getInt() // read the size ResponseHeader.parse(channel.buffer) val struct = api.responseSchema(request.version).read(channel.buffer) - AbstractResponse.parseResponse(api, struct) + AbstractResponse.parseResponse(api, struct, request.version) } private def expectNoThrottling(): Capture[RequestChannel.Response] = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 1308820..08aa624 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -657,6 +657,8 @@ class ReplicaManagerTest { purgatoryName = "Fetch", timer, reaperEnabled = false) val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( purgatoryName = "DeleteRecords", timer, reaperEnabled = false) + val mockElectPreferredLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectPreferredLeader]( + purgatoryName = "ElectPreferredLeader", timer, reaperEnabled = false) // Mock network client to show leader offset of 5 val quota = QuotaFactory.instantiate(config, metrics, time, "") @@ -665,7 +667,7 @@ class ReplicaManagerTest { val replicaManager = new ReplicaManager(config, metrics, time, kafkaZkClient, mockScheduler, mockLogMgr, new AtomicBoolean(false), quota, mockBrokerTopicStats, metadataCache, mockLogDirFailureChannel, mockProducePurgatory, mockFetchPurgatory, - mockDeleteRecordsPurgatory, Option(this.getClass.getName)) { + mockDeleteRecordsPurgatory, mockElectPreferredLeaderPurgatory, Option(this.getClass.getName)) { override protected def createReplicaFetcherManager(metrics: Metrics, time: Time, @@ -815,11 +817,13 @@ class ReplicaManagerTest { purgatoryName = "Fetch", timer, reaperEnabled = false) val mockDeleteRecordsPurgatory = new DelayedOperationPurgatory[DelayedDeleteRecords]( purgatoryName = "DeleteRecords", timer, reaperEnabled = false) + val mockDelayedElectPreferredLeaderPurgatory = new DelayedOperationPurgatory[DelayedElectPreferredLeader]( + purgatoryName = "DelayedElectPreferredLeader", timer, reaperEnabled = false) new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats, metadataCache, new LogDirFailureChannel(config.logDirs.size), mockProducePurgatory, mockFetchPurgatory, - mockDeleteRecordsPurgatory, Option(this.getClass.getName)) + mockDeleteRecordsPurgatory, mockDelayedElectPreferredLeaderPurgatory, Option(this.getClass.getName)) } } diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index c9e4e78..3176f72 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -24,6 +24,7 @@ import kafka.security.auth._ import kafka.utils.TestUtils import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource +import org.apache.kafka.common.message.ElectPreferredLeadersRequestData import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.metrics.{KafkaMetric, Quota, Sensor} @@ -359,6 +360,15 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.DELETE_GROUPS => new DeleteGroupsRequest.Builder(Collections.singleton("test-group")) + case ApiKeys.ELECT_PREFERRED_LEADERS => + val partition = new ElectPreferredLeadersRequestData.TopicPartitions() + .setPartitionId(Collections.singletonList(0)) + .setTopic("my_topic") + new ElectPreferredLeadersRequest.Builder( + new ElectPreferredLeadersRequestData() + .setTimeoutMs(0) + .setTopicPartitions(Collections.singletonList(partition))) + case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey) } @@ -450,6 +460,7 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.RENEW_DELEGATION_TOKEN => new RenewDelegationTokenResponse(response).throttleTimeMs case ApiKeys.DELETE_GROUPS => new DeleteGroupsResponse(response).throttleTimeMs case ApiKeys.OFFSET_FOR_LEADER_EPOCH => new OffsetsForLeaderEpochResponse(response).throttleTimeMs + case ApiKeys.ELECT_PREFERRED_LEADERS => new ElectPreferredLeadersResponse(response, 0).throttleTimeMs case requestId => throw new IllegalArgumentException(s"No throttle time for $requestId") } }