KAFKA-3307; Add ApiVersions Request/Response and server side handling. The patch does the following. 1. Adds ApiVersionsRequest/Response. 2. Adds UNSUPPORTED_VERSION error and UnsupportedVersionException. 3. Adds broker side handling of ApiVersionsRequest.
Author: Ashish Singh <[email protected]> Reviewers: Gwen Shapira, Ismael Juma, Magnus Edenhill Closes #986 from SinghAsDev/KAFKA-3307 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8407dac6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8407dac6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8407dac6 Branch: refs/heads/0.10.0 Commit: 8407dac6ee409d832c95533e6f1d5578511232ae Parents: 4c76b5f Author: Ashish Singh <[email protected]> Authored: Wed Apr 27 11:28:32 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Wed Apr 27 11:28:32 2016 -0700 ---------------------------------------------------------------------- .../errors/UnsupportedVersionException.java | 25 ++++ .../apache/kafka/common/protocol/ApiKeys.java | 5 +- .../apache/kafka/common/protocol/Errors.java | 5 +- .../apache/kafka/common/protocol/Protocol.java | 41 ++++++- .../kafka/common/requests/AbstractRequest.java | 4 +- .../common/requests/ApiVersionsRequest.java | 55 +++++++++ .../common/requests/ApiVersionsResponse.java | 116 +++++++++++++++++++ .../common/requests/RequestResponseTest.java | 16 ++- .../src/main/scala/kafka/server/KafkaApis.scala | 77 +++++++----- .../unit/kafka/server/ApiVersionsTest.scala | 51 ++++++++ 10 files changed, 357 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java new file mode 100644 index 0000000..3679be4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.errors; + +public class UnsupportedVersionException extends ApiException { + private static final long serialVersionUID = 1L; + + public UnsupportedVersionException(String message, Throwable cause) { + super(message, cause); + } + + public UnsupportedVersionException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 512a121..aeb0b45 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -37,7 +37,8 @@ public enum ApiKeys { SYNC_GROUP(14, "SyncGroup"), DESCRIBE_GROUPS(15, "DescribeGroups"), LIST_GROUPS(16, "ListGroups"), - SASL_HANDSHAKE(17, "SaslHandshake"); + SASL_HANDSHAKE(17, "SaslHandshake"), + API_VERSIONS(18, "ApiVersions"); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; @@ -97,4 +98,4 @@ public enum ApiKeys { System.out.println(toHtml()); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index 9013399..64a709e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -53,6 +53,7 @@ import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.UnsupportedSaslMechanismException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.errors.UnknownMemberIdException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; @@ -136,7 +137,9 @@ public enum Errors { UNSUPPORTED_SASL_MECHANISM(33, new UnsupportedSaslMechanismException("The broker does not support the requested SASL mechanism.")), ILLEGAL_SASL_STATE(34, - new IllegalSaslStateException("Request is not valid given the current SASL state.")); + new IllegalSaslStateException("Request is not valid given the current SASL state.")), + UNSUPPORTED_VERSION(35, + new UnsupportedVersionException("The version of API is not supported.")); private static final Logger log = LoggerFactory.getLogger(Errors.class); http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index d322095..99cdbf9 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -759,10 +759,24 @@ public class Protocol { public static final Schema[] SASL_HANDSHAKE_REQUEST = new Schema[] {SASL_HANDSHAKE_REQUEST_V0}; public static final Schema[] SASL_HANDSHAKE_RESPONSE = new Schema[] {SASL_HANDSHAKE_RESPONSE_V0}; + /* ApiVersion api */ + public static final Schema API_VERSIONS_REQUEST_V0 = new Schema(); + + public static final Schema API_VERSIONS_V0 = new Schema(new Field("api_key", INT16, "API key."), + new Field("min_version", INT16, "Minimum supported version."), + new Field("max_version", INT16, "Maximum supported version.")); + + public static final Schema API_VERSIONS_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."), + new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker.")); + + public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0}; + public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0}; + /* an array of all requests and responses with all schema versions; a null value in the inner array means that the * particular version is not supported */ public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][]; public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][]; + public static final short[] MIN_VERSIONS = new short[ApiKeys.MAX_API_KEY + 1]; /* the latest version of each api */ public static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1]; @@ -786,6 +800,7 @@ public class Protocol { REQUESTS[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_REQUEST; REQUESTS[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_REQUEST; REQUESTS[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_REQUEST; + REQUESTS[ApiKeys.API_VERSIONS.id] = API_VERSIONS_REQUEST; RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE; RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE; @@ -805,16 +820,32 @@ public class Protocol { RESPONSES[ApiKeys.DESCRIBE_GROUPS.id] = DESCRIBE_GROUPS_RESPONSE; RESPONSES[ApiKeys.LIST_GROUPS.id] = LIST_GROUPS_RESPONSE; RESPONSES[ApiKeys.SASL_HANDSHAKE.id] = SASL_HANDSHAKE_RESPONSE; + RESPONSES[ApiKeys.API_VERSIONS.id] = API_VERSIONS_RESPONSE; - /* set the maximum version of each api */ - for (ApiKeys api : ApiKeys.values()) + /* set the minimum and maximum version of each api */ + for (ApiKeys api : ApiKeys.values()) { CURR_VERSION[api.id] = (short) (REQUESTS[api.id].length - 1); + for (int i = 0; i < REQUESTS[api.id].length; ++i) + if (REQUESTS[api.id][i] != null) { + MIN_VERSIONS[api.id] = (short) i; + break; + } + } - /* sanity check that we have the same number of request and response versions for each api */ - for (ApiKeys api : ApiKeys.values()) + /* sanity check that: + * - we have the same number of request and response versions for each api + * - we have a consistent set of request and response versions for each api */ + for (ApiKeys api : ApiKeys.values()) { if (REQUESTS[api.id].length != RESPONSES[api.id].length) throw new IllegalStateException(REQUESTS[api.id].length + " request versions for api " + api.name + " but " + RESPONSES[api.id].length + " response versions."); + + for (int i = 0; i < REQUESTS[api.id].length; ++i) + if ((REQUESTS[api.id][i] == null && RESPONSES[api.id][i] != null) || + (REQUESTS[api.id][i] != null && RESPONSES[api.id][i] == null)) + throw new IllegalStateException("Request and response for version " + i + " of API " + + api.id + " are defined inconsistently. One is null while the other is not null."); + } } private static String indentString(int size) { @@ -977,4 +1008,4 @@ public class Protocol { System.out.println(toHtml()); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 89c2ce1..ab61c66 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -74,9 +74,11 @@ public abstract class AbstractRequest extends AbstractRequestResponse { return ListGroupsRequest.parse(buffer, versionId); case SASL_HANDSHAKE: return SaslHandshakeRequest.parse(buffer, versionId); + case API_VERSIONS: + return ApiVersionsRequest.parse(buffer, versionId); default: throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " + "code should be updated to do so.", apiKey)); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java new file mode 100644 index 0000000..b78c759 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.Collections; + +public class ApiVersionsRequest extends AbstractRequest { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentRequestSchema(ApiKeys.API_VERSIONS.id); + + public ApiVersionsRequest() { + super(new Struct(CURRENT_SCHEMA)); + } + + public ApiVersionsRequest(Struct struct) { + super(struct); + } + + @Override + public AbstractRequestResponse getErrorResponse(int versionId, Throwable e) { + switch (versionId) { + case 0: + short errorCode = Errors.forException(e).code(); + return new ApiVersionsResponse(errorCode, Collections.<ApiVersionsResponse.ApiVersion>emptyList()); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + versionId, this.getClass().getSimpleName(), ProtoUtils.latestVersion(ApiKeys.API_VERSIONS.id))); + } + } + + public static ApiVersionsRequest parse(ByteBuffer buffer, int versionId) { + return new ApiVersionsRequest(ProtoUtils.parseRequest(ApiKeys.API_VERSIONS.id, versionId, buffer)); + } + + public static ApiVersionsRequest parse(ByteBuffer buffer) { + return new ApiVersionsRequest(CURRENT_SCHEMA.read(buffer)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java new file mode 100644 index 0000000..36881a3 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.ProtoUtils; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ApiVersionsResponse extends AbstractRequestResponse { + + private static final Schema CURRENT_SCHEMA = ProtoUtils.currentResponseSchema(ApiKeys.API_VERSIONS.id); + + public static final String ERROR_CODE_KEY_NAME = "error_code"; + public static final String API_VERSIONS_KEY_NAME = "api_versions"; + public static final String API_KEY_NAME = "api_key"; + public static final String MIN_VERSION_KEY_NAME = "min_version"; + public static final String MAX_VERSION_KEY_NAME = "max_version"; + + /** + * Possible error codes: + * + * UNSUPPORTED_VERSION (33) + */ + private final short errorCode; + private final Map<Short, ApiVersion> apiKeyToApiVersion; + + public static final class ApiVersion { + public final short apiKey; + public final short minVersion; + public final short maxVersion; + + public ApiVersion(short apiKey, short minVersion, short maxVersion) { + this.apiKey = apiKey; + this.minVersion = minVersion; + this.maxVersion = maxVersion; + } + } + + public ApiVersionsResponse(short errorCode, List<ApiVersion> apiVersions) { + super(new Struct(CURRENT_SCHEMA)); + struct.set(ERROR_CODE_KEY_NAME, errorCode); + List<Struct> apiVersionList = new ArrayList<>(); + for (ApiVersion apiVersion : apiVersions) { + Struct apiVersionStruct = struct.instance(API_VERSIONS_KEY_NAME); + apiVersionStruct.set(API_KEY_NAME, apiVersion.apiKey); + apiVersionStruct.set(MIN_VERSION_KEY_NAME, apiVersion.minVersion); + apiVersionStruct.set(MAX_VERSION_KEY_NAME, apiVersion.maxVersion); + apiVersionList.add(apiVersionStruct); + } + struct.set(API_VERSIONS_KEY_NAME, apiVersionList.toArray()); + this.errorCode = errorCode; + this.apiKeyToApiVersion = buildApiKeyToApiVersion(apiVersions); + } + + public ApiVersionsResponse(Struct struct) { + super(struct); + this.errorCode = struct.getShort(ERROR_CODE_KEY_NAME); + List<ApiVersion> tempApiVersions = new ArrayList<>(); + for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) { + Struct apiVersionStruct = (Struct) apiVersionsObj; + short apiKey = apiVersionStruct.getShort(API_KEY_NAME); + short minVersion = apiVersionStruct.getShort(MIN_VERSION_KEY_NAME); + short maxVersion = apiVersionStruct.getShort(MAX_VERSION_KEY_NAME); + tempApiVersions.add(new ApiVersion(apiKey, minVersion, maxVersion)); + } + this.apiKeyToApiVersion = buildApiKeyToApiVersion(tempApiVersions); + } + + public Collection<ApiVersion> apiVersions() { + return apiKeyToApiVersion.values(); + } + + public ApiVersion apiVersion(short apiKey) { + return apiKeyToApiVersion.get(apiKey); + } + + public short errorCode() { + return errorCode; + } + + public static ApiVersionsResponse parse(ByteBuffer buffer) { + return new ApiVersionsResponse(CURRENT_SCHEMA.read(buffer)); + } + + public static ApiVersionsResponse fromError(Errors error) { + return new ApiVersionsResponse(error.code(), Collections.<ApiVersion>emptyList()); + } + + private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) { + Map<Short, ApiVersion> tempApiIdToApiVersion = new HashMap<>(); + for (ApiVersion apiVersion: apiVersions) { + tempApiIdToApiVersion.put(apiVersion.apiKey, apiVersion); + } + return tempApiIdToApiVersion; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 0018f53..345de3f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -95,7 +95,10 @@ public class RequestResponseTest { createLeaderAndIsrResponse(), createSaslHandshakeRequest(), createSaslHandshakeRequest().getErrorResponse(0, new UnknownServerException()), - createSaslHandshakeResponse() + createSaslHandshakeResponse(), + createApiVersionRequest(), + createApiVersionRequest().getErrorResponse(0, new UnknownServerException()), + createApiVersionResponse() ); for (AbstractRequestResponse req : requestResponseList) @@ -438,4 +441,13 @@ public class RequestResponseTest { private AbstractRequestResponse createSaslHandshakeResponse() { return new SaslHandshakeResponse(Errors.NONE.code(), Collections.singletonList("GSSAPI")); } -} + + private AbstractRequest createApiVersionRequest() { + return new ApiVersionsRequest(); + } + + private AbstractRequestResponse createApiVersionResponse() { + List<ApiVersionsResponse.ApiVersion> apiVersions = Arrays.asList(new ApiVersionsResponse.ApiVersion((short) 0, (short) 0, (short) 2)); + return new ApiVersionsResponse(Errors.NONE.code(), apiVersions); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 406b1bd..67d46fc 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.lang.{Long => JLong, Short => JShort} import java.util.Properties -import kafka.admin.{RackAwareMode, AdminUtils} +import kafka.admin.{AdminUtils, RackAwareMode} import kafka.api._ import kafka.cluster.Partition import kafka.common @@ -31,27 +31,32 @@ import kafka.coordinator.{GroupCoordinator, JoinGroupResult} import kafka.log._ import kafka.message.{ByteBufferMessageSet, Message, MessageSet} import kafka.network._ -import kafka.network.RequestChannel.{Session, Response} -import kafka.security.auth.{Authorizer, ClusterAction, Group, Create, Describe, Operation, Read, Resource, Topic, Write} +import kafka.network.RequestChannel.{Response, Session} +import kafka.security.auth.{Authorizer, ClusterAction, Create, Describe, Group, Operation, Read, Resource, Topic, Write} import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils} -import org.apache.kafka.common.errors.{InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, -ClusterAuthorizationException} +import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException} import org.apache.kafka.common.metrics.Metrics -import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} -import org.apache.kafka.common.requests.{ListOffsetRequest, ListOffsetResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, ListGroupsResponse, -DescribeGroupsRequest, DescribeGroupsResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, -LeaveGroupRequest, LeaveGroupResponse, ResponseHeader, ResponseSend, SyncGroupRequest, SyncGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, -StopReplicaRequest, StopReplicaResponse, ProduceRequest, ProduceResponse, UpdateMetadataRequest, UpdateMetadataResponse, -MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse} +import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol} +import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.{TopicPartition, Node} +import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.internals.TopicConstants import scala.collection._ import scala.collection.JavaConverters._ import org.apache.kafka.common.requests.SaslHandshakeResponse +object KafkaApis { + val apiVersionsResponse = new ApiVersionsResponse(Errors.NONE.code, buildApiKeysToApiVersions.values.toList.asJava) + + private def buildApiKeysToApiVersions: Map[Short, ApiVersionsResponse.ApiVersion] = { + ApiKeys.values.map(apiKey => + apiKey.id -> new ApiVersionsResponse.ApiVersion(apiKey.id, Protocol.MIN_VERSIONS(apiKey.id), Protocol.CURR_VERSION(apiKey.id))).toMap + } +} + + /** * Logic to handle the various Kafka requests */ @@ -74,7 +79,7 @@ class KafkaApis(val requestChannel: RequestChannel, * Top-level method that handles all requests and multiplexes to the right api */ def handle(request: RequestChannel.Request) { - try{ + try { trace("Handling request:%s from connection %s;securityProtocol:%s,principal:%s". format(request.requestObj, request.connectionId, request.securityProtocol, request.session.principal)) ApiKeys.forId(request.requestId) match { @@ -96,6 +101,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request) case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request) case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request) + case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request) case requestId => throw new KafkaException("Unknown api code " + requestId) } } catch { @@ -143,7 +149,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val responseHeader = new ResponseHeader(correlationId) - val leaderAndIsrResponse= + val leaderAndIsrResponse = if (authorize(request.session, ClusterAction, Resource.ClusterResource)) { val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange) new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava) @@ -234,7 +240,7 @@ class KafkaApis(val requestChannel: RequestChannel, } val filteredRequestInfo = offsetCommitRequest.offsetData.asScala.toMap -- invalidRequestsInfo.keys - val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { + val (authorizedRequestInfo, unauthorizedRequestInfo) = filteredRequestInfo.partition { case (topicPartition, offsetMetadata) => authorize(request.session, Read, new Resource(Topic, topicPartition.topic)) } @@ -251,7 +257,7 @@ class KafkaApis(val requestChannel: RequestChannel, val combinedCommitStatus = mergedCommitStatus.mapValues(new JShort(_)) ++ invalidRequestsInfo.map(_._1 -> new JShort(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)) val responseHeader = new ResponseHeader(header.correlationId) - val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava) + val responseBody = new OffsetCommitResponse(combinedCommitStatus.asJava) requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) } @@ -376,7 +382,7 @@ class KafkaApis(val requestChannel: RequestChannel, val respHeader = new ResponseHeader(request.header.correlationId) val respBody = request.header.apiVersion match { case 0 => new ProduceResponse(mergedResponseStatus.asJava) - case version@ (1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version) + case version@(1 | 2) => new ProduceResponse(mergedResponseStatus.asJava, delayTimeMs, version) // This case shouldn't happen unless a new version of ProducerRequest is added without // updating this part of the code to handle it properly. case version => throw new IllegalArgumentException(s"Version `$version` of ProduceRequest is not handled. Code must be updated.") @@ -426,7 +432,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleFetchRequest(request: RequestChannel.Request) { val fetchRequest = request.requestObj.asInstanceOf[FetchRequest] - val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition { + val (authorizedRequestInfo, unauthorizedRequestInfo) = fetchRequest.requestInfo.partition { case (topicAndPartition, _) => authorize(request.session, Read, new Resource(Topic, topicAndPartition.topic)) } @@ -552,14 +558,14 @@ class KafkaApis(val requestChannel: RequestChannel, case utpe: UnknownTopicOrPartitionException => debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( correlationId, clientId, topicPartition, utpe.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(utpe).code, List[JLong]().asJava)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(utpe).code, List[JLong]().asJava)) case nle: NotLeaderForPartitionException => debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format( correlationId, clientId, topicPartition,nle.getMessage)) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(nle).code, List[JLong]().asJava)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(nle).code, List[JLong]().asJava)) case e: Throwable => error("Error while responding to offset request", e) - (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava)) + (topicPartition, new ListOffsetResponse.PartitionData(Errors.forException(e).code, List[JLong]().asJava)) } }) @@ -591,7 +597,7 @@ class KafkaApis(val requestChannel: RequestChannel, else offsetTimeArray = new Array[(Long, Long)](segsArray.length) - for(i <- 0 until segsArray.length) + for (i <- 0 until segsArray.length) offsetTimeArray(i) = (segsArray(i).baseOffset, segsArray(i).lastModified) if (segsArray.last.size > 0) offsetTimeArray(segsArray.length) = (log.logEndOffset, SystemTime.milliseconds) @@ -610,18 +616,18 @@ class KafkaApis(val requestChannel: RequestChannel, if (offsetTimeArray(startIndex)._2 <= timestamp) isFound = true else - startIndex -=1 + startIndex -= 1 } } val retSize = maxNumOffsets.min(startIndex + 1) val ret = new Array[Long](retSize) - for(j <- 0 until retSize) { + for (j <- 0 until retSize) { ret(j) = offsetTimeArray(startIndex)._1 startIndex -= 1 } // ensure that the returned seq is in descending order of offsets - ret.toSeq.sortBy(- _) + ret.toSeq.sortBy(-_) } private def createTopic(topic: String, @@ -871,7 +877,7 @@ class KafkaApis(val requestChannel: RequestChannel, ListGroupsResponse.fromError(Errors.CLUSTER_AUTHORIZATION_FAILED) } else { val (error, groups) = coordinator.handleListGroups() - val allGroups = groups.map{ group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } + val allGroups = groups.map { group => new ListGroupsResponse.Group(group.groupId, group.protocolType) } new ListGroupsResponse(error.code, allGroups.asJava) } requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) @@ -1024,6 +1030,23 @@ class KafkaApis(val requestChannel: RequestChannel, requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, respHeader, response))) } + def handleApiVersionsRequest(request: RequestChannel.Request) { + // Note that broker returns its full list of supported ApiKeys and versions regardless of current + // authentication state (e.g., before SASL authentication on an SASL listener, do note that no + // Kafka protocol requests may take place on a SSL listener before the SSL handshake is finished). + // If this is considered to leak information about the broker version a workaround is to use SSL + // with client authentication which is performed at an earlier stage of the connection where the + // ApiVersionRequest is not available. + val responseHeader = new ResponseHeader(request.header.correlationId) + val isApiVersionsRequestVersionSupported = request.header.apiVersion <= Protocol.CURR_VERSION(ApiKeys.API_VERSIONS.id) && + request.header.apiVersion >= Protocol.MIN_VERSIONS(ApiKeys.API_VERSIONS.id) + val responseBody = if (isApiVersionsRequestVersionSupported) + KafkaApis.apiVersionsResponse + else + ApiVersionsResponse.fromError(Errors.UNSUPPORTED_VERSION) + requestChannel.sendResponse(new RequestChannel.Response(request, new ResponseSend(request.connectionId, responseHeader, responseBody))) + } + def close() { quotaManagers.foreach { case (apiKey, quotaManager) => quotaManager.shutdown() @@ -1035,4 +1058,4 @@ class KafkaApis(val requestChannel: RequestChannel, if (!authorize(request.session, ClusterAction, Resource.ClusterResource)) throw new ClusterAuthorizationException(s"Request $request is not authorized.") } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/8407dac6/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala new file mode 100644 index 0000000..4429f26 --- /dev/null +++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.server + +import kafka.server.KafkaApis +import org.apache.kafka.common.protocol.{Protocol, ApiKeys} +import org.junit.Assert._ +import org.junit.Test + +class ApiVersionsTest { + + @Test + def testApiVersions { + val apiVersions = KafkaApis.apiVersionsResponse.apiVersions + assertEquals("API versions for all API keys must be maintained.", apiVersions.size, ApiKeys.values().length) + + for (key <- ApiKeys.values) { + val version = KafkaApis.apiVersionsResponse.apiVersion(key.id) + assertNotNull(s"Could not find ApiVersion for API ${key.name}", version) + assertEquals(s"Incorrect min version for Api ${key.name}.", version.minVersion, Protocol.MIN_VERSIONS(key.id)) + assertEquals(s"Incorrect max version for Api ${key.name}.", version.maxVersion, Protocol.CURR_VERSION(key.id)) + + // Check if versions less than min version are indeed set as null, i.e., deprecated. + for (i <- 0 until version.minVersion) { + assertNull(s"Request version $i for API ${version.apiKey} must be null.", Protocol.REQUESTS(version.apiKey)(i)) + assertNull(s"Response version $i for API ${version.apiKey} must be null.", Protocol.RESPONSES(version.apiKey)(i)) + } + + // Check if versions between min and max versions are non null, i.e., valid. + for (i <- version.minVersion.toInt to version.maxVersion) { + assertNotNull(s"Request version $i for API ${version.apiKey} must not be null.", Protocol.REQUESTS(version.apiKey)(i)) + assertNotNull(s"Response version $i for API ${version.apiKey} must not be null.", Protocol.RESPONSES(version.apiKey)(i)) + } + } + } +} \ No newline at end of file
