MINOR: Consolidate broker request/response handling

This patch contains a few small improvements to make request/response handling 
more consistent. Primarily it consolidates request/response serialization logic 
so that `SaslServerAuthenticator` and `KafkaApis` follow the same path. It also 
reduces the amount of custom logic needed to handle unsupported versions of the 
ApiVersions requests.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3673 from hachikuji/consolidate-response-handling


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4d629a0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4d629a0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4d629a0

Branch: refs/heads/trunk
Commit: c4d629a0b3cbd11c174cb8b09a50bc8de77825e9
Parents: 05e3850
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Aug 25 10:23:11 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Aug 25 10:23:11 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientRequest.java |   2 +-
 .../org/apache/kafka/clients/NetworkClient.java |  24 +-
 .../internals/ConsumerNetworkClient.java        |   4 +-
 .../clients/producer/internals/Sender.java      |   5 +-
 .../kafka/common/network/ChannelBuilders.java   |   2 +-
 .../common/network/SaslChannelBuilder.java      |  17 +-
 .../org/apache/kafka/common/network/Send.java   |   4 +-
 .../kafka/common/requests/AbstractRequest.java  | 112 +++------
 .../kafka/common/requests/AbstractResponse.java |  19 +-
 .../common/requests/ApiVersionsRequest.java     |  27 ++-
 .../common/requests/ApiVersionsResponse.java    |  12 +-
 .../kafka/common/requests/FetchResponse.java    |  21 +-
 .../kafka/common/requests/RequestContext.java   |  92 ++++++++
 .../kafka/common/requests/RequestHeader.java    |  48 ++--
 .../authenticator/SaslClientAuthenticator.java  |   8 +-
 .../authenticator/SaslServerAuthenticator.java  | 103 +++++----
 .../common/network/SaslChannelBuilderTest.java  |   4 +-
 .../common/requests/RequestContextTest.java     |  75 ++++++
 .../common/requests/RequestHeaderTest.java      |   6 +-
 .../common/requests/RequestResponseTest.java    |  12 +-
 .../authenticator/SaslAuthenticatorTest.java    |  14 +-
 .../SaslServerAuthenticatorTest.java            |   8 +-
 .../controller/ControllerChannelManager.scala   |   2 +-
 ...nsactionMarkerRequestCompletionHandler.scala |   4 +-
 .../scala/kafka/network/RequestChannel.scala    | 111 ++++-----
 .../main/scala/kafka/network/SocketServer.scala |  24 +-
 .../server/ClientRequestQuotaManager.scala      |  25 +-
 .../src/main/scala/kafka/server/KafkaApis.scala | 229 +++++++++----------
 .../kafka/server/KafkaRequestHandler.scala      |  59 ++---
 core/src/main/scala/kafka/utils/Logging.scala   |   2 +
 .../scala/other/kafka/TestOffsetManager.scala   |   3 +-
 .../unit/kafka/admin/AdminRackAwareTest.scala   |   5 +-
 .../admin/ResetConsumerGroupOffsetTest.scala    |   9 +-
 .../TransactionMarkerChannelManagerTest.scala   |  11 +-
 ...tionMarkerRequestCompletionHandlerTest.scala |  30 ++-
 .../integration/KafkaServerTestHarness.scala    |   2 +-
 .../kafka/integration/PrimitiveApiTest.scala    |   3 +-
 .../kafka/integration/TopicMetadataTest.scala   |   2 +-
 .../ZookeeperConsumerConnectorTest.scala        |   1 -
 .../kafka/log/ProducerStateManagerTest.scala    |   2 +-
 .../unit/kafka/network/SocketServerTest.scala   |  64 +++---
 .../unit/kafka/server/BaseRequestTest.scala     |   2 +-
 .../unit/kafka/server/FetchRequestTest.scala    |   4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |  21 +-
 .../scala/unit/kafka/server/LogOffsetTest.scala |   2 +-
 .../unit/kafka/server/LogRecoveryTest.scala     |   4 +-
 .../unit/kafka/server/OffsetCommitTest.scala    |   2 +-
 .../unit/kafka/server/ReplicaFetchTest.scala    |   3 -
 .../kafka/server/ReplicationQuotasTest.scala    |   2 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |   4 +-
 .../unit/kafka/utils/IteratorTemplateTest.scala |   2 +-
 .../unit/kafka/utils/timer/TimerTest.scala      |   3 +-
 52 files changed, 685 insertions(+), 571 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 1111964..9b62946 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -79,7 +79,7 @@ public final class ClientRequest {
     }
 
     public RequestHeader makeHeader(short version) {
-        return new RequestHeader(apiKey().id, version, clientId, 
correlationId);
+        return new RequestHeader(apiKey(), version, clientId, correlationId);
     }
 
     public AbstractRequest.Builder<?> requestBuilder() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 4fe55ae..897cca5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -247,11 +247,11 @@ public class NetworkClient implements KafkaClient {
         long now = time.milliseconds();
         for (InFlightRequest request : inFlightRequests.clearAll(nodeId)) {
             if (request.isInternalRequest) {
-                if (request.header.apiKey() == ApiKeys.METADATA.id) {
+                if (request.header.apiKey() == ApiKeys.METADATA) {
                     metadataUpdater.handleDisconnection(request.destination);
                 }
             } else {
-                requestTypes.add(ApiKeys.forId(request.header.apiKey()));
+                requestTypes.add(request.header.apiKey());
                 abortedSends.add(new ClientResponse(request.header,
                         request.callback, request.destination, 
request.createdTimeMs, now,
                         true, null, null));
@@ -275,7 +275,7 @@ public class NetworkClient implements KafkaClient {
     public void close(String nodeId) {
         selector.close(nodeId);
         for (InFlightRequest request : inFlightRequests.clearAll(nodeId))
-            if (request.isInternalRequest && request.header.apiKey() == 
ApiKeys.METADATA.id)
+            if (request.isInternalRequest && request.header.apiKey() == 
ApiKeys.METADATA)
                 metadataUpdater.handleDisconnection(request.destination);
         connectionStates.remove(nodeId);
     }
@@ -556,27 +556,21 @@ public class NetworkClient implements KafkaClient {
     }
 
     public static AbstractResponse parseResponse(ByteBuffer responseBuffer, 
RequestHeader requestHeader) {
-        return 
createResponse(parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, 
requestHeader,
-                null, 0), requestHeader);
+        Struct responseStruct = 
parseStructMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 
0);
+        return AbstractResponse.parseResponse(requestHeader.apiKey(), 
responseStruct);
     }
 
     private static Struct parseStructMaybeUpdateThrottleTimeMetrics(ByteBuffer 
responseBuffer, RequestHeader requestHeader,
                                                                     Sensor 
throttleTimeSensor, long now) {
         ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
-        ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
         // Always expect the response version id to be the same as the request 
version id
-        Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), 
responseBuffer);
+        Struct responseBody = 
requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), 
responseBuffer);
         correlate(requestHeader, responseHeader);
         if (throttleTimeSensor != null && 
responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME))
             
throttleTimeSensor.record(responseBody.getInt(AbstractResponse.THROTTLE_TIME_KEY_NAME),
 now);
         return responseBody;
     }
 
-    private static AbstractResponse createResponse(Struct responseStruct, 
RequestHeader requestHeader) {
-        ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
-        return AbstractResponse.getResponse(apiKey, responseStruct);
-    }
-
     /**
      * Post process disconnection of a node
      *
@@ -602,7 +596,7 @@ public class NetworkClient implements KafkaClient {
         for (InFlightRequest request : this.inFlightRequests.clearAll(nodeId)) 
{
             log.trace("Cancelled request {} with correlation id {} due to node 
{} being disconnected", request.request,
                     request.header.correlationId(), nodeId);
-            if (request.isInternalRequest && request.header.apiKey() == 
ApiKeys.METADATA.id)
+            if (request.isInternalRequest && request.header.apiKey() == 
ApiKeys.METADATA)
                 metadataUpdater.handleDisconnection(request.destination);
             else
                 responses.add(request.disconnected(now));
@@ -666,9 +660,9 @@ public class NetworkClient implements KafkaClient {
                 throttleTimeSensor, now);
             if (log.isTraceEnabled()) {
                 log.trace("Completed receive from node {} for {} with 
correlation id {}, received {}", req.destination,
-                    ApiKeys.forId(req.header.apiKey()), 
req.header.correlationId(), responseStruct);
+                    req.header.apiKey(), req.header.correlationId(), 
responseStruct);
             }
-            AbstractResponse body = createResponse(responseStruct, req.header);
+            AbstractResponse body = 
AbstractResponse.parseResponse(req.header.apiKey(), responseStruct);
             if (req.isInternalRequest && body instanceof MetadataResponse)
                 metadataUpdater.handleCompletedMetadataResponse(req.header, 
now, (MetadataResponse) body);
             else if (req.isInternalRequest && body instanceof 
ApiVersionsResponse)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4a38d04..803a853 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -26,7 +26,6 @@ import org.apache.kafka.common.errors.DisconnectException;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.utils.LogContext;
@@ -489,10 +488,9 @@ public class ConsumerNetworkClient implements Closeable {
                 future.raise(e);
             } else if (response.wasDisconnected()) {
                 RequestHeader requestHeader = response.requestHeader();
-                ApiKeys api = ApiKeys.forId(requestHeader.apiKey());
                 int correlation = requestHeader.correlationId();
                 log.debug("Cancelled {} request {} with correlation id {} due 
to node {} being disconnected",
-                        api, requestHeader, correlation, 
response.destination());
+                        requestHeader.apiKey(), requestHeader, correlation, 
response.destination());
                 future.raise(DisconnectException.INSTANCE);
             } else if (response.versionMismatch() != null) {
                 future.raise(response.versionMismatch());

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 806cfdf..411282b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -44,7 +44,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.RecordBatch;
@@ -453,8 +452,8 @@ public class Sender implements Runnable {
         RequestHeader requestHeader = response.requestHeader();
         int correlationId = requestHeader.correlationId();
         if (response.wasDisconnected()) {
-            ApiKeys api = ApiKeys.forId(requestHeader.apiKey());
-            log.trace("Cancelled {} request {} with correlation id {}  due to 
node {} being disconnected", api, requestHeader, correlationId, 
response.destination());
+            log.trace("Cancelled request with header {} due to node {} being 
disconnected",
+                    requestHeader, response.destination());
             for (ProducerBatch batch : batches.values())
                 completeBatch(batch, new 
ProduceResponse.PartitionResponse(Errors.NETWORK_EXCEPTION), correlationId, 
now);
         } else if (response.versionMismatch() != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java 
b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 5145be7..225f5a8 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -98,7 +98,7 @@ public class ChannelBuilders {
             case SASL_PLAINTEXT:
                 requireNonNullMode(mode, securityProtocol);
                 JaasContext jaasContext = JaasContext.load(contextType, 
listenerName, configs);
-                channelBuilder = new SaslChannelBuilder(mode, jaasContext, 
securityProtocol,
+                channelBuilder = new SaslChannelBuilder(mode, jaasContext, 
securityProtocol, listenerName,
                         clientSaslMechanism, saslHandshakeRequestEnable, 
credentialCache);
                 break;
             case PLAINTEXT:

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 445c1ba..0f98463 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -43,6 +43,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
     private static final Logger log = 
LoggerFactory.getLogger(SaslChannelBuilder.class);
 
     private final SecurityProtocol securityProtocol;
+    private final ListenerName listenerName;
     private final String clientSaslMechanism;
     private final Mode mode;
     private final JaasContext jaasContext;
@@ -54,12 +55,17 @@ public class SaslChannelBuilder implements ChannelBuilder {
     private Map<String, ?> configs;
     private KerberosShortNamer kerberosShortNamer;
 
-    public SaslChannelBuilder(Mode mode, JaasContext jaasContext, 
SecurityProtocol securityProtocol,
+    public SaslChannelBuilder(Mode mode,
+                              JaasContext jaasContext,
+                              SecurityProtocol securityProtocol,
+                              ListenerName listenerName,
                               String clientSaslMechanism,
-                              boolean handshakeRequestEnable, CredentialCache 
credentialCache) {
+                              boolean handshakeRequestEnable,
+                              CredentialCache credentialCache) {
         this.mode = mode;
         this.jaasContext = jaasContext;
         this.securityProtocol = securityProtocol;
+        this.listenerName = listenerName;
         this.handshakeRequestEnable = handshakeRequestEnable;
         this.clientSaslMechanism = clientSaslMechanism;
         this.credentialCache = credentialCache;
@@ -109,8 +115,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
             Authenticator authenticator;
             if (mode == Mode.SERVER)
                 authenticator = new SaslServerAuthenticator(id, jaasContext, 
loginManager.subject(),
-                        kerberosShortNamer, 
socketChannel.socket().getLocalAddress().getHostName(),
-                        credentialCache);
+                        kerberosShortNamer, 
socketChannel.socket().getLocalAddress(), credentialCache,
+                        listenerName, securityProtocol);
             else
                 authenticator = new SaslClientAuthenticator(id, 
loginManager.subject(), loginManager.serviceName(),
                         socketChannel.socket().getInetAddress().getHostName(), 
clientSaslMechanism, handshakeRequestEnable);
@@ -161,8 +167,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
         }
         getInstanceMethod = classRef.getMethod("getInstance", new Class[0]);
         kerbConf = getInstanceMethod.invoke(classRef, new Object[0]);
-        getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm",
-                new Class[0]);
+        getDefaultRealmMethod = classRef.getDeclaredMethod("getDefaultRealm", 
new Class[0]);
         return (String) getDefaultRealmMethod.invoke(kerbConf, new Object[0]);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/network/Send.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java 
b/clients/src/main/java/org/apache/kafka/common/network/Send.java
index c64193a..e6febc8 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Send.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java
@@ -20,12 +20,12 @@ import java.io.IOException;
 import java.nio.channels.GatheringByteChannel;
 
 /**
- * This interface models the in-progress sending of data to a destination 
identified by an integer id.
+ * This interface models the in-progress sending of data to a specific 
destination
  */
 public interface Send {
 
     /**
-     * The numeric id for the destination of this send
+     * The id for the destination of this send
      */
     String destination();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 2cd88e1..00de8c1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -105,119 +105,81 @@ public abstract class AbstractRequest extends 
AbstractRequestResponse {
     public abstract AbstractResponse getErrorResponse(int throttleTimeMs, 
Throwable e);
 
     /**
-     * Factory method for getting a request object based on ApiKey ID and a 
buffer
+     * Factory method for getting a request object based on ApiKey ID and a 
version
      */
-    public static RequestAndSize getRequest(int requestId, short version, 
ByteBuffer buffer) {
-        ApiKeys apiKey = ApiKeys.forId(requestId);
-        Struct struct = apiKey.parseRequest(version, buffer);
-        AbstractRequest request;
+    public static AbstractRequest parseRequest(ApiKeys apiKey, short 
apiVersion, Struct struct) {
         switch (apiKey) {
             case PRODUCE:
-                request = new ProduceRequest(struct, version);
-                break;
+                return new ProduceRequest(struct, apiVersion);
             case FETCH:
-                request = new FetchRequest(struct, version);
-                break;
+                return new FetchRequest(struct, apiVersion);
             case LIST_OFFSETS:
-                request = new ListOffsetRequest(struct, version);
-                break;
+                return new ListOffsetRequest(struct, apiVersion);
             case METADATA:
-                request = new MetadataRequest(struct, version);
-                break;
+                return new MetadataRequest(struct, apiVersion);
             case OFFSET_COMMIT:
-                request = new OffsetCommitRequest(struct, version);
-                break;
+                return new OffsetCommitRequest(struct, apiVersion);
             case OFFSET_FETCH:
-                request = new OffsetFetchRequest(struct, version);
-                break;
+                return new OffsetFetchRequest(struct, apiVersion);
             case FIND_COORDINATOR:
-                request = new FindCoordinatorRequest(struct, version);
-                break;
+                return new FindCoordinatorRequest(struct, apiVersion);
             case JOIN_GROUP:
-                request = new JoinGroupRequest(struct, version);
-                break;
+                return new JoinGroupRequest(struct, apiVersion);
             case HEARTBEAT:
-                request = new HeartbeatRequest(struct, version);
-                break;
+                return new HeartbeatRequest(struct, apiVersion);
             case LEAVE_GROUP:
-                request = new LeaveGroupRequest(struct, version);
-                break;
+                return new LeaveGroupRequest(struct, apiVersion);
             case SYNC_GROUP:
-                request = new SyncGroupRequest(struct, version);
-                break;
+                return new SyncGroupRequest(struct, apiVersion);
             case STOP_REPLICA:
-                request = new StopReplicaRequest(struct, version);
-                break;
+                return new StopReplicaRequest(struct, apiVersion);
             case CONTROLLED_SHUTDOWN_KEY:
-                request = new ControlledShutdownRequest(struct, version);
-                break;
+                return new ControlledShutdownRequest(struct, apiVersion);
             case UPDATE_METADATA_KEY:
-                request = new UpdateMetadataRequest(struct, version);
-                break;
+                return new UpdateMetadataRequest(struct, apiVersion);
             case LEADER_AND_ISR:
-                request = new LeaderAndIsrRequest(struct, version);
-                break;
+                return new LeaderAndIsrRequest(struct, apiVersion);
             case DESCRIBE_GROUPS:
-                request = new DescribeGroupsRequest(struct, version);
-                break;
+                return new DescribeGroupsRequest(struct, apiVersion);
             case LIST_GROUPS:
-                request = new ListGroupsRequest(struct, version);
-                break;
+                return new ListGroupsRequest(struct, apiVersion);
             case SASL_HANDSHAKE:
-                request = new SaslHandshakeRequest(struct, version);
-                break;
+                return new SaslHandshakeRequest(struct, apiVersion);
             case API_VERSIONS:
-                request = new ApiVersionsRequest(struct, version);
-                break;
+                return new ApiVersionsRequest(struct, apiVersion);
             case CREATE_TOPICS:
-                request = new CreateTopicsRequest(struct, version);
-                break;
+                return new CreateTopicsRequest(struct, apiVersion);
             case DELETE_TOPICS:
-                request = new DeleteTopicsRequest(struct, version);
-                break;
+                return new DeleteTopicsRequest(struct, apiVersion);
             case DELETE_RECORDS:
-                request = new DeleteRecordsRequest(struct, version);
-                break;
+                return new DeleteRecordsRequest(struct, apiVersion);
             case INIT_PRODUCER_ID:
-                request = new InitProducerIdRequest(struct, version);
-                break;
+                return new InitProducerIdRequest(struct, apiVersion);
             case OFFSET_FOR_LEADER_EPOCH:
-                request = new OffsetsForLeaderEpochRequest(struct, version);
-                break;
+                return new OffsetsForLeaderEpochRequest(struct, apiVersion);
             case ADD_PARTITIONS_TO_TXN:
-                request = new AddPartitionsToTxnRequest(struct, version);
-                break;
+                return new AddPartitionsToTxnRequest(struct, apiVersion);
             case ADD_OFFSETS_TO_TXN:
-                request = new AddOffsetsToTxnRequest(struct, version);
-                break;
+                return new AddOffsetsToTxnRequest(struct, apiVersion);
             case END_TXN:
-                request = new EndTxnRequest(struct, version);
-                break;
+                return new EndTxnRequest(struct, apiVersion);
             case WRITE_TXN_MARKERS:
-                request = new WriteTxnMarkersRequest(struct, version);
-                break;
+                return new WriteTxnMarkersRequest(struct, apiVersion);
             case TXN_OFFSET_COMMIT:
-                request = new TxnOffsetCommitRequest(struct, version);
-                break;
+                return new TxnOffsetCommitRequest(struct, apiVersion);
             case DESCRIBE_ACLS:
-                request = new DescribeAclsRequest(struct, version);
-                break;
+                return new DescribeAclsRequest(struct, apiVersion);
             case CREATE_ACLS:
-                request = new CreateAclsRequest(struct, version);
-                break;
+                return new CreateAclsRequest(struct, apiVersion);
             case DELETE_ACLS:
-                request = new DeleteAclsRequest(struct, version);
-                break;
+                return new DeleteAclsRequest(struct, apiVersion);
             case DESCRIBE_CONFIGS:
-                request = new DescribeConfigsRequest(struct, version);
-                break;
+                return new DescribeConfigsRequest(struct, apiVersion);
             case ALTER_CONFIGS:
-                request = new AlterConfigsRequest(struct, version);
-                break;
+                return new AlterConfigsRequest(struct, apiVersion);
             default:
-                throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `getRequest`, the " +
+                throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseRequest`, the " +
                         "code should be updated to do so.", apiKey));
         }
-        return new RequestAndSize(request, struct.sizeOf());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 1686976..5f1f615 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -27,21 +27,12 @@ public abstract class AbstractResponse extends 
AbstractRequestResponse {
     public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final int DEFAULT_THROTTLE_TIME = 0;
 
-    public Send toSend(String destination, RequestHeader requestHeader) {
-        return toSend(destination, requestHeader.apiVersion(), 
requestHeader.toResponseHeader());
+    protected Send toSend(String destination, ResponseHeader header, short 
apiVersion) {
+        return new NetworkSend(destination, serialize(apiVersion, header));
     }
 
     /**
-     * This should only be used if we need to return a response with a 
different version than the request, which
-     * should be very rare (an example is @link {@link 
ApiVersionsResponse#unsupportedVersionSend(String, RequestHeader)}).
-     * Typically {@link #toSend(String, RequestHeader)} should be used.
-     */
-    public Send toSend(String destination, short version, ResponseHeader 
responseHeader) {
-        return new NetworkSend(destination, serialize(version, 
responseHeader));
-    }
-
-    /**
-     * Visible for testing, typically {@link #toSend(String, RequestHeader)} 
should be used instead.
+     * Visible for testing, typically {@link #toSend(String, ResponseHeader, 
short)} should be used instead.
      */
     public ByteBuffer serialize(short version, ResponseHeader responseHeader) {
         return serialize(responseHeader.toStruct(), toStruct(version));
@@ -49,7 +40,7 @@ public abstract class AbstractResponse extends 
AbstractRequestResponse {
 
     protected abstract Struct toStruct(short version);
 
-    public static AbstractResponse getResponse(ApiKeys apiKey, Struct struct) {
+    public static AbstractResponse parseResponse(ApiKeys apiKey, Struct 
struct) {
         switch (apiKey) {
             case PRODUCE:
                 return new ProduceResponse(struct);
@@ -120,7 +111,7 @@ public abstract class AbstractResponse extends 
AbstractRequestResponse {
             case ALTER_CONFIGS:
                 return new AlterConfigsResponse(struct);
             default:
-                throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `getResponse`, the " +
+                throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `parseResponse`, the " +
                         "code should be updated to do so.", apiKey));
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index 6f63040..025ef6c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -45,12 +45,29 @@ public class ApiVersionsRequest extends AbstractRequest {
         }
     }
 
+    private final Short unsupportedRequestVersion;
+
     public ApiVersionsRequest(short version) {
+        this(version, null);
+    }
+
+    public ApiVersionsRequest(short version, Short unsupportedRequestVersion) {
         super(version);
+
+        // Unlike other request types, the broker handles ApiVersion requests 
with higher versions than
+        // supported. It does so by treating the request as if it were v0 and 
returns a response using
+        // the v0 response schema. The reason for this is that the client does 
not yet know what versions
+        // a broker supports when this request is sent, so instead of assuming 
the lowest supported version,
+        // it can use the most recent version and only fallback to the old 
version when necessary.
+        this.unsupportedRequestVersion = unsupportedRequestVersion;
     }
 
     public ApiVersionsRequest(Struct struct, short version) {
-        super(version);
+        this(version, null);
+    }
+
+    public boolean hasUnsupportedRequestVersion() {
+        return unsupportedRequestVersion != null;
     }
 
     @Override
@@ -59,16 +76,16 @@ public class ApiVersionsRequest extends AbstractRequest {
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        short versionId = version();
-        switch (versionId) {
+    public ApiVersionsResponse getErrorResponse(int throttleTimeMs, Throwable 
e) {
+        short version = version();
+        switch (version) {
             case 0:
                 return new ApiVersionsResponse(Errors.forException(e), 
Collections.<ApiVersionsResponse.ApiVersion>emptyList());
             case 1:
                 return new ApiVersionsResponse(throttleTimeMs, 
Errors.forException(e), 
Collections.<ApiVersionsResponse.ApiVersion>emptyList());
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), 
ApiKeys.API_VERSIONS.latestVersion()));
+                        version, this.getClass().getSimpleName(), 
ApiKeys.API_VERSIONS.latestVersion()));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index e9d5023..5a48c93 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
@@ -25,7 +24,6 @@ import org.apache.kafka.common.record.RecordBatch;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -123,15 +121,6 @@ public class ApiVersionsResponse extends AbstractResponse {
         return createApiVersionsResponse(throttleTimeMs, maxMagic);
     }
 
-    /**
-     * Returns Errors.UNSUPPORTED_VERSION response with version 0 since we 
don't support the requested version.
-     */
-    public static Send unsupportedVersionSend(String destination, 
RequestHeader requestHeader) {
-        ApiVersionsResponse response = new 
ApiVersionsResponse(DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION,
-                Collections.<ApiVersion>emptyList());
-        return response.toSend(destination, (short) 0, 
requestHeader.toResponseHeader());
-    }
-
     public int throttleTimeMs() {
         return throttleTimeMs;
     }
@@ -169,4 +158,5 @@ public class ApiVersionsResponse extends AbstractResponse {
         }
         return tempApiIdToApiVersion;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index b52b6f5..281ad44 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -235,26 +235,19 @@ public class FetchResponse extends AbstractResponse {
     }
 
     @Override
-    public Send toSend(String dest, RequestHeader requestHeader) {
-        return toSend(toStruct(requestHeader.apiVersion()), throttleTimeMs, 
dest, requestHeader);
-    }
-
-    public Send toSend(int throttleTimeMs, String dest, RequestHeader 
requestHeader) {
-        return toSend(toStruct(requestHeader.apiVersion()), throttleTimeMs, 
dest, requestHeader);
-    }
-
-    private Send toSend(Struct responseStruct, int throttleTimeMs, String 
dest, RequestHeader requestHeader) {
-        Struct responseHeader = new 
ResponseHeader(requestHeader.correlationId()).toStruct();
+    protected Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
+        Struct responseHeaderStruct = responseHeader.toStruct();
+        Struct responseBodyStruct = toStruct(apiVersion);
 
         // write the total size and the response header
-        ByteBuffer buffer = ByteBuffer.allocate(responseHeader.sizeOf() + 4);
-        buffer.putInt(responseHeader.sizeOf() + responseStruct.sizeOf());
-        responseHeader.writeTo(buffer);
+        ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() 
+ 4);
+        buffer.putInt(responseHeaderStruct.sizeOf() + 
responseBodyStruct.sizeOf());
+        responseHeaderStruct.writeTo(buffer);
         buffer.rewind();
 
         List<Send> sends = new ArrayList<>();
         sends.add(new ByteBufferSend(dest, buffer));
-        addResponseData(responseStruct, throttleTimeMs, dest, sends);
+        addResponseData(responseBodyStruct, throttleTimeMs, dest, sends);
         return new MultiSend(dest, sends);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java 
b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
new file mode 100644
index 0000000..34bb3f5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestContext.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Protocol;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.protocol.ApiKeys.API_VERSIONS;
+
+public class RequestContext {
+    public final RequestHeader header;
+    public final String connectionId;
+    public final InetAddress clientAddress;
+    public final KafkaPrincipal principal;
+    public final ListenerName listenerName;
+    public final SecurityProtocol securityProtocol;
+
+    public RequestContext(RequestHeader header,
+                          String connectionId,
+                          InetAddress clientAddress,
+                          KafkaPrincipal principal,
+                          ListenerName listenerName,
+                          SecurityProtocol securityProtocol) {
+        this.header = header;
+        this.connectionId = connectionId;
+        this.clientAddress = clientAddress;
+        this.principal = principal;
+        this.listenerName = listenerName;
+        this.securityProtocol = securityProtocol;
+    }
+
+    public RequestAndSize parseRequest(ByteBuffer buffer) {
+        if (isUnsupportedApiVersionsRequest()) {
+            // Unsupported ApiVersion requests are treated as v0 requests and 
are not parsed
+            ApiVersionsRequest apiVersionsRequest = new 
ApiVersionsRequest((short) 0, header.apiVersion());
+            return new RequestAndSize(apiVersionsRequest, 0);
+        } else {
+            ApiKeys apiKey = header.apiKey();
+            try {
+                short apiVersion = header.apiVersion();
+                Struct struct = apiKey.parseRequest(apiVersion, buffer);
+                AbstractRequest body = AbstractRequest.parseRequest(apiKey, 
apiVersion, struct);
+                return new RequestAndSize(body, struct.sizeOf());
+            } catch (Throwable ex) {
+                throw new InvalidRequestException("Error getting request for 
apiKey: " + apiKey +
+                        ", apiVersion: " + header.apiVersion() +
+                        ", connectionId: " + connectionId +
+                        ", listenerName: " + listenerName +
+                        ", principal: " + principal, ex);
+            }
+        }
+    }
+
+    public Send buildResponse(AbstractResponse body) {
+        ResponseHeader responseHeader = header.toResponseHeader();
+        short version = header.apiVersion();
+
+        // Use v0 when serializing an unhandled ApiVersion response
+        if (isUnsupportedApiVersionsRequest())
+            version = 0;
+
+        return body.toSend(connectionId, responseHeader, version);
+    }
+
+    private boolean isUnsupportedApiVersionsRequest() {
+        return header.apiKey() == API_VERSIONS && 
!Protocol.apiVersionSupported(API_VERSIONS.id, header.apiVersion());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java 
b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
index 18ea576..43b7baf 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestHeader.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.errors.InvalidRequestException;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Protocol;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
 
+import static java.util.Objects.requireNonNull;
+
 /**
  * The header for a request in the Kafka protocol
  */
@@ -31,13 +35,17 @@ public class RequestHeader extends AbstractRequestResponse {
     private static final String CLIENT_ID_FIELD_NAME = "client_id";
     private static final String CORRELATION_ID_FIELD_NAME = "correlation_id";
 
-    private final short apiKey;
+    private final ApiKeys apiKey;
     private final short apiVersion;
     private final String clientId;
     private final int correlationId;
 
     public RequestHeader(Struct struct) {
-        apiKey = struct.getShort(API_KEY_FIELD_NAME);
+        short apiKey = struct.getShort(API_KEY_FIELD_NAME);
+        if (!ApiKeys.hasId(apiKey))
+            throw new InvalidRequestException("Unknown API key " + apiKey);
+
+        this.apiKey = ApiKeys.forId(apiKey);
         apiVersion = struct.getShort(API_VERSION_FIELD_NAME);
 
         // only v0 of the controlled shutdown request is missing the clientId
@@ -48,17 +56,17 @@ public class RequestHeader extends AbstractRequestResponse {
         correlationId = struct.getInt(CORRELATION_ID_FIELD_NAME);
     }
 
-    public RequestHeader(short apiKey, short version, String clientId, int 
correlation) {
-        this.apiKey = apiKey;
+    public RequestHeader(ApiKeys apiKey, short version, String clientId, int 
correlation) {
+        this.apiKey = requireNonNull(apiKey);
         this.apiVersion = version;
         this.clientId = clientId;
         this.correlationId = correlation;
     }
 
     public Struct toStruct() {
-        Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion);
+        Schema schema = Protocol.requestHeaderSchema(apiKey.id, apiVersion);
         Struct struct = new Struct(schema);
-        struct.set(API_KEY_FIELD_NAME, apiKey);
+        struct.set(API_KEY_FIELD_NAME, apiKey.id);
         struct.set(API_VERSION_FIELD_NAME, apiVersion);
 
         // only v0 of the controlled shutdown request is missing the clientId
@@ -68,7 +76,7 @@ public class RequestHeader extends AbstractRequestResponse {
         return struct;
     }
 
-    public short apiKey() {
+    public ApiKeys apiKey() {
         return apiKey;
     }
 
@@ -89,16 +97,27 @@ public class RequestHeader extends AbstractRequestResponse {
     }
 
     public static RequestHeader parse(ByteBuffer buffer) {
-        short apiKey = buffer.getShort();
-        short apiVersion = buffer.getShort();
-        Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion);
-        buffer.rewind();
-        return new RequestHeader(schema.read(buffer));
+        try {
+            short apiKey = buffer.getShort();
+            short apiVersion = buffer.getShort();
+            Schema schema = Protocol.requestHeaderSchema(apiKey, apiVersion);
+            buffer.rewind();
+            return new RequestHeader(schema.read(buffer));
+        } catch (InvalidRequestException e) {
+            throw e;
+        } catch (Throwable  ex) {
+            throw new InvalidRequestException("Error parsing request header. 
Our best guess of the apiKey is: " +
+                    buffer.getShort(0), ex);
+        }
     }
 
     @Override
     public String toString() {
-        return toStruct().toString();
+        return "RequestHeader(apiKey=" + apiKey +
+                ", apiVersion=" + apiVersion +
+                ", clientId=" + clientId +
+                ", correlationId=" + correlationId +
+                ")";
     }
 
     @Override
@@ -115,11 +134,10 @@ public class RequestHeader extends 
AbstractRequestResponse {
 
     @Override
     public int hashCode() {
-        int result = (int) apiKey;
+        int result = apiKey.hashCode();
         result = 31 * result + (int) apiVersion;
         result = 31 * result + (clientId != null ? clientId.hashCode() : 0);
         result = 31 * result + correlationId;
         return result;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index 6dab8f9..6bf9b2a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -159,7 +159,7 @@ public class SaslClientAuthenticator implements 
Authenticator {
                 // fetch supported versions.
                 String clientId = (String) 
configs.get(CommonClientConfigs.CLIENT_ID_CONFIG);
                 SaslHandshakeRequest handshakeRequest = new 
SaslHandshakeRequest(mechanism);
-                currentRequestHeader = new 
RequestHeader(ApiKeys.SASL_HANDSHAKE.id,
+                currentRequestHeader = new 
RequestHeader(ApiKeys.SASL_HANDSHAKE,
                         handshakeRequest.version(), clientId, correlationId++);
                 send(handshakeRequest.toSend(node, currentRequestHeader));
                 setSaslState(SaslState.RECEIVE_HANDSHAKE_RESPONSE);
@@ -308,20 +308,18 @@ public class SaslClientAuthenticator implements 
Authenticator {
 
     private void handleKafkaResponse(RequestHeader requestHeader, byte[] 
responseBytes) {
         AbstractResponse response;
-        ApiKeys apiKey;
         try {
             response = 
NetworkClient.parseResponse(ByteBuffer.wrap(responseBytes), requestHeader);
-            apiKey = ApiKeys.forId(requestHeader.apiKey());
         } catch (SchemaException | IllegalArgumentException e) {
             LOG.debug("Invalid SASL mechanism response, server may be 
expecting only GSSAPI tokens");
             throw new AuthenticationException("Invalid SASL mechanism 
response", e);
         }
-        switch (apiKey) {
+        switch (requestHeader.apiKey()) {
             case SASL_HANDSHAKE:
                 handleSaslHandshakeResponse((SaslHandshakeResponse) response);
                 break;
             default:
-                throw new IllegalStateException("Unexpected API key during 
handshake: " + apiKey);
+                throw new IllegalStateException("Unexpected API key during 
handshake: " + requestHeader.apiKey());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index ca2d3eb..276d067 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -20,9 +20,10 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
+import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
-import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.Authenticator;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.NetworkSend;
@@ -30,11 +31,12 @@ import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.Protocol;
-import org.apache.kafka.common.protocol.types.SchemaException;
-import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.RequestAndSize;
+import org.apache.kafka.common.requests.RequestContext;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.requests.SaslHandshakeRequest;
 import org.apache.kafka.common.requests.SaslHandshakeResponse;
@@ -61,6 +63,7 @@ import javax.security.sasl.Sasl;
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslServer;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.security.Principal;
@@ -81,11 +84,13 @@ public class SaslServerAuthenticator implements 
Authenticator {
         GSSAPI_OR_HANDSHAKE_REQUEST, HANDSHAKE_REQUEST, AUTHENTICATE, 
COMPLETE, FAILED
     }
 
-    private final String node;
+    private final SecurityProtocol securityProtocol;
+    private final ListenerName listenerName;
+    private final String connectionId;
     private final JaasContext jaasContext;
     private final Subject subject;
     private final KerberosShortNamer kerberosNamer;
-    private final String host;
+    private final InetAddress clientAddress;
     private final CredentialCache credentialCache;
 
     // Current SASL state
@@ -105,17 +110,24 @@ public class SaslServerAuthenticator implements 
Authenticator {
     private NetworkReceive netInBuffer;
     private Send netOutBuffer;
 
-    public SaslServerAuthenticator(String node, JaasContext jaasContext, final 
Subject subject,
-                                   KerberosShortNamer kerberosNameParser, 
String host,
-                                   CredentialCache credentialCache) throws 
IOException {
+    public SaslServerAuthenticator(String connectionId,
+                                   JaasContext jaasContext,
+                                   Subject subject,
+                                   KerberosShortNamer kerberosNameParser,
+                                   InetAddress clientAddress,
+                                   CredentialCache credentialCache,
+                                   ListenerName listenerName,
+                                   SecurityProtocol securityProtocol) throws 
IOException {
         if (subject == null)
             throw new IllegalArgumentException("subject cannot be null");
-        this.node = node;
+        this.connectionId = connectionId;
         this.jaasContext = jaasContext;
         this.subject = subject;
         this.kerberosNamer = kerberosNameParser;
-        this.host = host;
+        this.clientAddress = clientAddress;
         this.credentialCache = credentialCache;
+        this.listenerName = listenerName;
+        this.securityProtocol = securityProtocol;
     }
 
     public void configure(TransportLayer transportLayer, PrincipalBuilder 
principalBuilder, Map<String, ?> configs) {
@@ -140,7 +152,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
             try {
                 saslServer = Subject.doAs(subject, new 
PrivilegedExceptionAction<SaslServer>() {
                     public SaslServer run() throws SaslException {
-                        return Sasl.createSaslServer(saslMechanism, "kafka", 
host, configs, callbackHandler);
+                        return Sasl.createSaslServer(saslMechanism, "kafka", 
clientAddress.getHostName(), configs, callbackHandler);
                     }
                 });
             } catch (PrivilegedActionException e) {
@@ -211,7 +223,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
             return;
         }
 
-        if (netInBuffer == null) netInBuffer = new 
NetworkReceive(MAX_RECEIVE_SIZE, node);
+        if (netInBuffer == null) netInBuffer = new 
NetworkReceive(MAX_RECEIVE_SIZE, connectionId);
 
         netInBuffer.readFrom(transportLayer);
 
@@ -233,7 +245,7 @@ public class SaslServerAuthenticator implements 
Authenticator {
                     case AUTHENTICATE:
                         byte[] response = 
saslServer.evaluateResponse(clientToken);
                         if (response != null) {
-                            netOutBuffer = new NetworkSend(node, 
ByteBuffer.wrap(response));
+                            netOutBuffer = new NetworkSend(connectionId, 
ByteBuffer.wrap(response));
                             flushNetOutBufferAndUpdateInterestOps();
                         }
                         // When the authentication exchange is complete and no 
more tokens are expected from the client,
@@ -298,38 +310,32 @@ public class SaslServerAuthenticator implements 
Authenticator {
         String clientMechanism = null;
         try {
             ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-            RequestHeader requestHeader = RequestHeader.parse(requestBuffer);
-            ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
+            RequestHeader header = RequestHeader.parse(requestBuffer);
+            ApiKeys apiKey = header.apiKey();
+
             // A valid Kafka request header was received. SASL authentication 
tokens are now expected only
             // following a SaslHandshakeRequest since this is not a GSSAPI 
client token from a Kafka 0.9.0.x client.
             setSaslState(SaslState.HANDSHAKE_REQUEST);
             isKafkaRequest = true;
 
-            if (!Protocol.apiVersionSupported(requestHeader.apiKey(), 
requestHeader.apiVersion())) {
-                if (apiKey == ApiKeys.API_VERSIONS)
-                    
sendKafkaResponse(ApiVersionsResponse.unsupportedVersionSend(node, 
requestHeader));
-                else
-                    throw new UnsupportedVersionException("Version " + 
requestHeader.apiVersion() + " is not supported for apiKey " + apiKey);
-            } else {
-                LOG.debug("Handle Kafka request {}", apiKey);
-                switch (apiKey) {
-                    case API_VERSIONS:
-                        handleApiVersionsRequest(requestHeader);
-                        break;
-                    case SASL_HANDSHAKE:
-                        short version = requestHeader.apiVersion();
-                        Struct struct = 
ApiKeys.SASL_HANDSHAKE.parseRequest(version, requestBuffer);
-                        SaslHandshakeRequest saslHandshakeRequest = new 
SaslHandshakeRequest(struct, version);
-                        clientMechanism = 
handleHandshakeRequest(requestHeader, saslHandshakeRequest);
-                        break;
-                    default:
-                        throw new IllegalSaslStateException("Unexpected Kafka 
request of type " + apiKey + " during SASL handshake.");
-                }
-            }
-        } catch (SchemaException | IllegalArgumentException e) {
+            // Raise an error prior to parsing if the api cannot be handled at 
this layer. This avoids
+            // unnecessary exposure to some of the more complex schema types.
+            if (apiKey != ApiKeys.API_VERSIONS && apiKey != 
ApiKeys.SASL_HANDSHAKE)
+                throw new IllegalSaslStateException("Unexpected Kafka request 
of type " + apiKey + " during SASL handshake.");
+
+            LOG.debug("Handling Kafka request {}", apiKey);
+
+            RequestContext requestContext = new RequestContext(header, 
connectionId, clientAddress,
+                    KafkaPrincipal.ANONYMOUS, listenerName, securityProtocol);
+            RequestAndSize requestAndSize = 
requestContext.parseRequest(requestBuffer);
+            if (apiKey == ApiKeys.API_VERSIONS)
+                handleApiVersionsRequest(requestContext, (ApiVersionsRequest) 
requestAndSize.request);
+            else
+                clientMechanism = handleHandshakeRequest(requestContext, 
(SaslHandshakeRequest) requestAndSize.request);
+        } catch (InvalidRequestException e) {
             if (saslState == SaslState.GSSAPI_OR_HANDSHAKE_REQUEST) {
-                // SchemaException is thrown if the request is not in Kafka 
format. IllegalArgumentException is thrown
-                // if the API key is invalid. For compatibility with 0.9.0.x 
where the first packet is a GSSAPI token
+                // InvalidRequestException is thrown if the request is not in 
Kafka format or if the API key
+                // is invalid. For compatibility with 0.9.0.x where the first 
packet is a GSSAPI token
                 // starting with 0x60, revert to GSSAPI for both these 
exceptions.
                 if (LOG.isDebugEnabled()) {
                     StringBuilder tokenBuilder = new StringBuilder();
@@ -355,25 +361,28 @@ public class SaslServerAuthenticator implements 
Authenticator {
         return isKafkaRequest;
     }
 
-    private String handleHandshakeRequest(RequestHeader requestHeader, 
SaslHandshakeRequest handshakeRequest) throws IOException, 
UnsupportedSaslMechanismException {
+    private String handleHandshakeRequest(RequestContext context, 
SaslHandshakeRequest handshakeRequest) throws IOException, 
UnsupportedSaslMechanismException {
         String clientMechanism = handshakeRequest.mechanism();
         if (enabledMechanisms.contains(clientMechanism)) {
             LOG.debug("Using SASL mechanism '{}' provided by client", 
clientMechanism);
-            sendKafkaResponse(requestHeader, new 
SaslHandshakeResponse(Errors.NONE, enabledMechanisms));
+            sendKafkaResponse(context, new SaslHandshakeResponse(Errors.NONE, 
enabledMechanisms));
             return clientMechanism;
         } else {
             LOG.debug("SASL mechanism '{}' requested by client is not 
supported", clientMechanism);
-            sendKafkaResponse(requestHeader, new 
SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms));
+            sendKafkaResponse(context, new 
SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, enabledMechanisms));
             throw new UnsupportedSaslMechanismException("Unsupported SASL 
mechanism " + clientMechanism);
         }
     }
 
-    private void handleApiVersionsRequest(RequestHeader requestHeader) throws 
IOException, UnsupportedSaslMechanismException {
-        sendKafkaResponse(requestHeader, 
ApiVersionsResponse.API_VERSIONS_RESPONSE);
+    private void handleApiVersionsRequest(RequestContext context, 
ApiVersionsRequest apiVersionsRequest) throws IOException, 
UnsupportedSaslMechanismException {
+        if (apiVersionsRequest.hasUnsupportedRequestVersion())
+            sendKafkaResponse(context, apiVersionsRequest.getErrorResponse(0, 
Errors.UNSUPPORTED_VERSION.exception()));
+        else
+            sendKafkaResponse(context, 
ApiVersionsResponse.API_VERSIONS_RESPONSE);
     }
 
-    private void sendKafkaResponse(RequestHeader requestHeader, 
AbstractResponse response) throws IOException {
-        sendKafkaResponse(response.toSend(node, requestHeader));
+    private void sendKafkaResponse(RequestContext context, AbstractResponse 
response) throws IOException {
+        sendKafkaResponse(context.buildResponse(response));
     }
 
     private void sendKafkaResponse(Send send) throws IOException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index 2f41c77..275104a 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -71,8 +71,8 @@ public class SaslChannelBuilderTest {
         TestJaasConfig jaasConfig = new TestJaasConfig();
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), 
new HashMap<String, Object>());
         JaasContext jaasContext = new JaasContext("jaasContext", 
JaasContext.Type.SERVER, jaasConfig);
-        return new SaslChannelBuilder(Mode.CLIENT, jaasContext, 
securityProtocol, "PLAIN",
-                true, null);
+        return new SaslChannelBuilder(Mode.CLIENT, jaasContext, 
securityProtocol, new ListenerName("PLAIN"),
+                "PLAIN", true, null);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
new file mode 100644
index 0000000..7679711
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestContextTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RequestContextTest {
+
+    @Test
+    public void testSerdeUnsupportedApiVersionRequest() throws Exception {
+        int correlationId = 23423;
+
+        RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, 
Short.MAX_VALUE, "", correlationId);
+        RequestContext context = new RequestContext(header, "0", 
InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS,
+                new ListenerName("ssl"), SecurityProtocol.SASL_SSL);
+
+        // Write some garbage to the request buffer. This should be ignored 
since we will treat
+        // the unknown version type as v0 which has an empty request body.
+        ByteBuffer requestBuffer = ByteBuffer.allocate(8);
+        requestBuffer.putInt(3709234);
+        requestBuffer.putInt(29034);
+        requestBuffer.flip();
+
+        RequestAndSize requestAndSize = context.parseRequest(requestBuffer);
+        assertTrue(requestAndSize.request instanceof ApiVersionsRequest);
+        ApiVersionsRequest request = (ApiVersionsRequest) 
requestAndSize.request;
+        assertTrue(request.hasUnsupportedRequestVersion());
+
+        Send send = context.buildResponse(new ApiVersionsResponse(0, 
Errors.UNSUPPORTED_VERSION,
+                Collections.<ApiVersionsResponse.ApiVersion>emptyList()));
+        ByteBufferChannel channel = new ByteBufferChannel(256);
+        send.writeTo(channel);
+
+        ByteBuffer responseBuffer = channel.buffer();
+        responseBuffer.flip();
+        responseBuffer.getInt(); // strip off the size
+
+        ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
+        assertEquals(correlationId, responseHeader.correlationId());
+
+        Struct struct = ApiKeys.API_VERSIONS.parseResponse((short) 0, 
responseBuffer);
+        ApiVersionsResponse response = (ApiVersionsResponse) 
AbstractResponse.parseResponse(ApiKeys.API_VERSIONS, struct);
+        assertEquals(Errors.UNSUPPORTED_VERSION, response.error());
+        assertTrue(response.apiVersions().isEmpty());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
index a1184d8..bc3bd37 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestHeaderTest.java
@@ -39,7 +39,7 @@ public class RequestHeaderTest {
         rawBuffer.flip();
 
         RequestHeader deserialized = RequestHeader.parse(rawBuffer);
-        assertEquals(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, 
deserialized.apiKey());
+        assertEquals(ApiKeys.CONTROLLED_SHUTDOWN_KEY, deserialized.apiKey());
         assertEquals(0, deserialized.apiVersion());
         assertEquals(correlationId, deserialized.correlationId());
         assertEquals("", deserialized.clientId());
@@ -55,7 +55,7 @@ public class RequestHeaderTest {
 
     @Test
     public void testRequestHeader() {
-        RequestHeader header = new RequestHeader((short) 10, (short) 1, "", 
10);
+        RequestHeader header = new RequestHeader(ApiKeys.FIND_COORDINATOR, 
(short) 1, "", 10);
         ByteBuffer buffer = toBuffer(header.toStruct());
         RequestHeader deserialized = RequestHeader.parse(buffer);
         assertEquals(header, deserialized);
@@ -63,7 +63,7 @@ public class RequestHeaderTest {
 
     @Test
     public void testRequestHeaderWithNullClientId() {
-        RequestHeader header = new RequestHeader((short) 10, (short) 1, null, 
10);
+        RequestHeader header = new RequestHeader(ApiKeys.FIND_COORDINATOR, 
(short) 1, null, 10);
         Struct headerStruct = header.toStruct();
         ByteBuffer buffer = toBuffer(headerStruct);
         RequestHeader deserialized = RequestHeader.parse(buffer);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 42d4205..067fc27 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -456,10 +456,10 @@ public class RequestResponseTest {
     @Test
     public void verifyFetchResponseFullWrite() throws Exception {
         FetchResponse fetchResponse = createFetchResponse();
-        RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, 
ApiKeys.FETCH.latestVersion(),
-                "client", 15);
+        short apiVersion = ApiKeys.FETCH.latestVersion();
+        int correlationId = 15;
 
-        Send send = fetchResponse.toSend("1", header);
+        Send send = fetchResponse.toSend("1", new 
ResponseHeader(correlationId), apiVersion);
         ByteBufferChannel channel = new ByteBufferChannel(send.size());
         send.writeTo(channel);
         channel.close();
@@ -472,11 +472,11 @@ public class RequestResponseTest {
 
         // read the header
         ResponseHeader responseHeader = ResponseHeader.parse(channel.buffer());
-        assertEquals(header.correlationId(), responseHeader.correlationId());
+        assertEquals(correlationId, responseHeader.correlationId());
 
         // read the body
-        Struct responseBody = 
ApiKeys.FETCH.responseSchema(header.apiVersion()).read(buf);
-        assertEquals(fetchResponse.toStruct(header.apiVersion()), 
responseBody);
+        Struct responseBody = 
ApiKeys.FETCH.responseSchema(apiVersion).read(buf);
+        assertEquals(fetchResponse.toStruct(apiVersion), responseBody);
 
         assertEquals(size, responseHeader.sizeOf() + responseBody.sizeOf());
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 77cbdbe..6b0eca3 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -392,7 +392,7 @@ public class SaslAuthenticatorTest {
         // Send ApiVersionsRequest with unsupported version and validate error 
response.
         String node = "1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node);
-        RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS.id, 
Short.MAX_VALUE, "someclient", 1);
+        RequestHeader header = new RequestHeader(ApiKeys.API_VERSIONS, 
Short.MAX_VALUE, "someclient", 1);
         ApiVersionsRequest request = new ApiVersionsRequest.Builder().build();
         selector.send(request.toSend(node, header));
         ByteBuffer responseBuffer = waitForResponse();
@@ -425,7 +425,7 @@ public class SaslAuthenticatorTest {
         String node1 = "invalid1";
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
         SaslHandshakeRequest request = new SaslHandshakeRequest("PLAIN");
-        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE.id, 
Short.MAX_VALUE, "someclient", 2);
+        RequestHeader header = new RequestHeader(ApiKeys.SASL_HANDSHAKE, 
Short.MAX_VALUE, "someclient", 2);
         selector.send(request.toSend(node1, header));
         NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
         selector.close();
@@ -490,8 +490,7 @@ public class SaslAuthenticatorTest {
         sendHandshakeRequestReceiveResponse(node1);
 
         ApiVersionsRequest request = new ApiVersionsRequest.Builder().build();
-        RequestHeader versionsHeader = new 
RequestHeader(ApiKeys.API_VERSIONS.id,
-                request.version(), "someclient", 2);
+        RequestHeader versionsHeader = new RequestHeader(ApiKeys.API_VERSIONS, 
request.version(), "someclient", 2);
         selector.send(request.toSend(node1, versionsHeader));
         NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
         selector.close();
@@ -556,7 +555,7 @@ public class SaslAuthenticatorTest {
         createClientConnection(SecurityProtocol.PLAINTEXT, node1);
         MetadataRequest metadataRequest1 = new 
MetadataRequest.Builder(Collections.singletonList("sometopic"),
                 true).build();
-        RequestHeader metadataRequestHeader1 = new 
RequestHeader(ApiKeys.METADATA.id, metadataRequest1.version(),
+        RequestHeader metadataRequestHeader1 = new 
RequestHeader(ApiKeys.METADATA, metadataRequest1.version(),
                 "someclient", 1);
         selector.send(metadataRequest1.toSend(node1, metadataRequestHeader1));
         NetworkTestUtils.waitForChannelClose(selector, node1, 
ChannelState.READY);
@@ -570,7 +569,7 @@ public class SaslAuthenticatorTest {
         createClientConnection(SecurityProtocol.PLAINTEXT, node2);
         sendHandshakeRequestReceiveResponse(node2);
         MetadataRequest metadataRequest2 = new 
MetadataRequest.Builder(Collections.singletonList("sometopic"), true).build();
-        RequestHeader metadataRequestHeader2 = new 
RequestHeader(ApiKeys.METADATA.id,
+        RequestHeader metadataRequestHeader2 = new 
RequestHeader(ApiKeys.METADATA,
                 metadataRequest2.version(), "someclient", 2);
         selector.send(metadataRequest2.toSend(node2, metadataRequestHeader2));
         NetworkTestUtils.waitForChannelClose(selector, node2, 
ChannelState.READY);
@@ -829,8 +828,7 @@ public class SaslAuthenticatorTest {
     }
 
     private AbstractResponse sendKafkaRequestReceiveResponse(String node, 
ApiKeys apiKey, AbstractRequest request) throws IOException {
-        RequestHeader header =
-                new RequestHeader(apiKey.id, request.version(), "someclient", 
1);
+        RequestHeader header = new RequestHeader(apiKey, request.version(), 
"someclient", 1);
         Send send = request.toSend(node, header);
         selector.send(send);
         ByteBuffer responseBuffer = waitForResponse();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
index b76f3cc..d37c206 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java
@@ -19,8 +19,10 @@ package org.apache.kafka.common.security.authenticator;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.network.InvalidReceiveException;
+import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.TransportLayer;
 import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.RequestHeader;
 import org.apache.kafka.common.security.JaasContext;
@@ -33,6 +35,7 @@ import org.junit.Test;
 
 import javax.security.auth.Subject;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
@@ -75,7 +78,7 @@ public class SaslServerAuthenticatorTest {
         Map<String, ?> configs = 
Collections.singletonMap(SaslConfigs.SASL_ENABLED_MECHANISMS,
                 Collections.singletonList(SCRAM_SHA_256.mechanismName()));
 
-        final RequestHeader header = new RequestHeader(ApiKeys.METADATA.id, 
(short) 0, "clientId", 13243);
+        final RequestHeader header = new RequestHeader(ApiKeys.METADATA, 
(short) 0, "clientId", 13243);
         final Struct headerStruct = header.toStruct();
 
         final Capture<ByteBuffer> size = EasyMock.newCapture();
@@ -113,7 +116,8 @@ public class SaslServerAuthenticatorTest {
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), 
new HashMap<String, Object>());
         JaasContext jaasContext = new JaasContext("jaasContext", 
JaasContext.Type.SERVER, jaasConfig);
         Subject subject = new Subject();
-        return new SaslServerAuthenticator("node", jaasContext, subject, null, 
"localhost", new CredentialCache());
+        return new SaslServerAuthenticator("node", jaasContext, subject, null, 
InetAddress.getLocalHost(),
+                new CredentialCache(), new ListenerName("ssl"), 
SecurityProtocol.SASL_SSL);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b89fb62..57ff203 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -238,7 +238,7 @@ class RequestSendThread(val controllerId: Int,
       }
       if (clientResponse != null) {
         val requestHeader = clientResponse.requestHeader
-        val api = ApiKeys.forId(requestHeader.apiKey)
+        val api = requestHeader.apiKey
         if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && 
api != ApiKeys.UPDATE_METADATA_KEY)
           throw new KafkaException(s"Unexpected apiKey received: $apiKey")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/c4d629a0/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 19c37fa..150b444 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -37,9 +37,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
     val requestHeader = response.requestHeader
     val correlationId = requestHeader.correlationId
     if (response.wasDisconnected) {
-      val api = ApiKeys.forId(requestHeader.apiKey)
-      val correlation = requestHeader.correlationId
-      trace(s"Cancelled $api request $requestHeader with correlation id 
$correlation due to node ${response.destination} being disconnected")
+      trace(s"Cancelled request with header $requestHeader due to node 
${response.destination} being disconnected")
 
       for (txnIdAndMarker <- txnIdAndMarkerEntries.asScala) {
         val transactionalId = txnIdAndMarker.txnId

Reply via email to