MINOR: Remove Struct from Request/Response classes

More details:
* Replaced `struct` field in Request/Response with a `toStruct` method. This
makes the performance model (including memory usage) easier to understand.
Note that requests have `toStruct()` while responses have `toStruct(version)`.
* Replaced mutable `version` field in `Request.Builder` with an immutable
field `desiredVersion` and a `version` parameter passed to the `build` method.
* Optimised `handleFetchRequest` to avoid unnecessary creation of `Struct`
instances (from 4 to 2 in the worst case and 2 to 1 in the best case).
* Various clean-ups in request/response classes and their test. In particular,
it is now clear what we are testing. Previously, it looked like we were testing
more than we really were.

With this in place, we could remove `AbstractRequest.Builder` in the future by
doing the following:
* Change `AbstractRequest.toStruct` to accept a version (like responses).
* Change `AbstractRequest.version` to be `desiredVersion` (like `Builder`).
* Change `ClientRequest` to take `AbstractRequest`.
* Move validation from the `build` methods to the request constructors or
static factory methods.
* Anything else required for the code to compile again.

Author: Ismael Juma <ism...@juma.me.uk>

Reviewers: Apurva Mehta <apurva.1...@gmail.com>, Jason Gustafson 
<ja...@confluent.io>

Closes #2513 from ijuma/separate-struct


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fc1cfe47
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fc1cfe47
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fc1cfe47

Branch: refs/heads/trunk
Commit: fc1cfe475e8ae8458d8ddf119ce18d0c64653a70
Parents: 1f2ee5f
Author: Ismael Juma <ism...@juma.me.uk>
Authored: Fri Feb 17 14:19:01 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Feb 17 14:19:01 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientRequest.java |   5 +-
 .../org/apache/kafka/clients/NetworkClient.java |  21 +-
 .../consumer/internals/ConsumerCoordinator.java |   9 +-
 .../internals/ConsumerNetworkClient.java        |   5 +-
 .../clients/consumer/internals/Fetcher.java     |  19 +-
 .../kafka/common/requests/AbstractRequest.java  | 110 ++++--
 .../requests/AbstractRequestResponse.java       |  53 +--
 .../kafka/common/requests/AbstractResponse.java |  25 +-
 .../common/requests/ApiVersionsRequest.java     |  26 +-
 .../common/requests/ApiVersionsResponse.java    |  49 +--
 .../requests/ControlledShutdownRequest.java     |  23 +-
 .../requests/ControlledShutdownResponse.java    |  40 +--
 .../common/requests/CreateTopicsRequest.java    | 106 +++---
 .../common/requests/CreateTopicsResponse.java   |  45 ++-
 .../common/requests/DeleteTopicsRequest.java    |  28 +-
 .../common/requests/DeleteTopicsResponse.java   |  35 +-
 .../common/requests/DescribeGroupsRequest.java  |  25 +-
 .../common/requests/DescribeGroupsResponse.java |  67 ++--
 .../kafka/common/requests/FetchRequest.java     | 109 +++---
 .../kafka/common/requests/FetchResponse.java    |  74 ++--
 .../requests/GroupCoordinatorRequest.java       |  20 +-
 .../requests/GroupCoordinatorResponse.java      |  26 +-
 .../kafka/common/requests/HeartbeatRequest.java |  27 +-
 .../common/requests/HeartbeatResponse.java      |  17 +-
 .../kafka/common/requests/JoinGroupRequest.java |  51 ++-
 .../common/requests/JoinGroupResponse.java      |  57 ++-
 .../common/requests/LeaderAndIsrRequest.java    |  86 ++---
 .../common/requests/LeaderAndIsrResponse.java   |  46 +--
 .../common/requests/LeaveGroupRequest.java      |  24 +-
 .../common/requests/LeaveGroupResponse.java     |  22 +-
 .../common/requests/ListGroupsRequest.java      |  19 +-
 .../common/requests/ListGroupsResponse.java     |  33 +-
 .../common/requests/ListOffsetRequest.java      | 113 +++---
 .../common/requests/ListOffsetResponse.java     |  76 ++--
 .../kafka/common/requests/MetadataRequest.java  |  33 +-
 .../kafka/common/requests/MetadataResponse.java | 129 ++++---
 .../common/requests/OffsetCommitRequest.java    | 135 +++-----
 .../common/requests/OffsetCommitResponse.java   |  52 ++-
 .../common/requests/OffsetFetchRequest.java     |  75 ++--
 .../common/requests/OffsetFetchResponse.java    |  77 ++---
 .../kafka/common/requests/ProduceRequest.java   |  70 ++--
 .../kafka/common/requests/ProduceResponse.java  |  42 +--
 .../kafka/common/requests/RequestAndSize.java   |  27 ++
 .../kafka/common/requests/RequestHeader.java    |  21 +-
 .../kafka/common/requests/ResponseHeader.java   |  15 +-
 .../common/requests/SaslHandshakeRequest.java   |  21 +-
 .../common/requests/SaslHandshakeResponse.java  |  21 +-
 .../common/requests/StopReplicaRequest.java     |  50 +--
 .../common/requests/StopReplicaResponse.java    |  49 ++-
 .../kafka/common/requests/SyncGroupRequest.java |  44 +--
 .../common/requests/SyncGroupResponse.java      |  21 +-
 .../common/requests/UpdateMetadataRequest.java  | 130 +++----
 .../common/requests/UpdateMetadataResponse.java |  18 +-
 .../authenticator/SaslServerAuthenticator.java  |  11 +-
 .../kafka/common/utils/CollectionUtils.java     |  12 +-
 .../org/apache/kafka/clients/MockClient.java    |  17 +-
 .../apache/kafka/clients/NetworkClientTest.java |  10 +-
 .../clients/consumer/KafkaConsumerTest.java     |   2 +-
 .../clients/consumer/internals/FetcherTest.java |   2 +-
 .../common/requests/RequestResponseTest.java    | 345 ++++++++++---------
 .../authenticator/SaslAuthenticatorTest.java    |   2 +-
 .../src/main/scala/kafka/api/FetchRequest.scala |   2 +-
 .../kafka/api/GenericRequestAndHeader.scala     |  56 ---
 .../kafka/api/GenericResponseAndHeader.scala    |  47 ---
 .../main/scala/kafka/api/ProducerRequest.scala  |   2 +-
 .../controller/ControllerChannelManager.scala   |  11 +-
 .../scala/kafka/network/RequestChannel.scala    |  36 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  97 +++---
 .../kafka/server/ReplicaFetcherThread.scala     |  15 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  33 +-
 .../unit/kafka/network/SocketServerTest.scala   |  26 +-
 .../AbstractCreateTopicsRequestTest.scala       |  20 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |  14 +-
 .../unit/kafka/server/BaseRequestTest.scala     |  70 ++--
 .../kafka/server/CreateTopicsRequestTest.scala  |  21 +-
 .../kafka/server/DeleteTopicsRequestTest.scala  |   6 +-
 .../unit/kafka/server/EdgeCaseRequestTest.scala |  20 +-
 .../unit/kafka/server/FetchRequestTest.scala    |  15 +-
 .../unit/kafka/server/MetadataCacheTest.scala   |  32 +-
 .../unit/kafka/server/MetadataRequestTest.scala |  16 +-
 .../unit/kafka/server/ProduceRequestTest.scala  |   4 +-
 .../server/SaslApiVersionsRequestTest.scala     |  31 +-
 .../internals/InternalTopicManagerTest.java     |   4 +-
 83 files changed, 1681 insertions(+), 1819 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
index 08b8d46..a1973ad 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientRequest.java
@@ -74,9 +74,8 @@ public final class ClientRequest {
         return requestBuilder.apiKey();
     }
 
-    public RequestHeader makeHeader() {
-        return new RequestHeader(requestBuilder.apiKey().id,
-                requestBuilder.version(), clientId, correlationId);
+    public RequestHeader makeHeader(short version) {
+        return new RequestHeader(requestBuilder.apiKey().id, version, 
clientId, correlationId);
     }
 
     public AbstractRequest.Builder<?> requestBuilder() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 890bf56..4131bcb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -280,43 +280,46 @@ public class NetworkClient implements KafkaClient {
             if (!canSendRequest(nodeId))
                 throw new IllegalStateException("Attempt to send a request to 
node " + nodeId + " which is not ready.");
         }
-        AbstractRequest request = null;
         AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
         try {
             NodeApiVersions versionInfo = nodeApiVersions.get(nodeId);
+            short version;
             // Note: if versionInfo is null, we have no server version 
information. This would be
             // the case when sending the initial ApiVersionRequest which 
fetches the version
             // information itself.  It is also the case when 
discoverBrokerVersions is set to false.
             if (versionInfo == null) {
+                version = builder.desiredOrLatestVersion();
                 if (discoverBrokerVersions && log.isTraceEnabled())
                     log.trace("No version information found when sending 
message of type {} to node {}. " +
-                            "Assuming version {}.", clientRequest.apiKey(), 
nodeId, builder.version());
+                            "Assuming version {}.", clientRequest.apiKey(), 
nodeId, version);
             } else {
-                short version = 
versionInfo.usableVersion(clientRequest.apiKey());
-                builder.setVersion(version);
+                version = versionInfo.usableVersion(clientRequest.apiKey());
             }
             // The call to build may also throw UnsupportedVersionException, 
if there are essential
             // fields that cannot be represented in the chosen version.
-            request = builder.build();
+            doSend(clientRequest, isInternalRequest, now, 
builder.build(version));
         } catch (UnsupportedVersionException e) {
             // If the version is not supported, skip sending the request over 
the wire.
             // Instead, simply add it to the local queue of aborted requests.
             log.debug("Version mismatch when attempting to send {} to {}",
                     clientRequest.toString(), clientRequest.destination(), e);
-            ClientResponse clientResponse = new 
ClientResponse(clientRequest.makeHeader(),
+            ClientResponse clientResponse = new 
ClientResponse(clientRequest.makeHeader(builder.desiredOrLatestVersion()),
                     clientRequest.callback(), clientRequest.destination(), 
now, now,
                     false, e, null);
             abortedSends.add(clientResponse);
-            return;
         }
-        RequestHeader header = clientRequest.makeHeader();
+    }
+
+    private void doSend(ClientRequest clientRequest, boolean 
isInternalRequest, long now, AbstractRequest request) {
+        String nodeId = clientRequest.destination();
+        RequestHeader header = clientRequest.makeHeader(request.version());
         if (log.isDebugEnabled()) {
             int latestClientVersion = 
ProtoUtils.latestVersion(clientRequest.apiKey().id);
             if (header.apiVersion() == latestClientVersion) {
                 log.trace("Sending {} to node {}.", request, nodeId);
             } else {
                 log.debug("Using older server API v{} to send {} to node {}.",
-                    header.apiVersion(), request, nodeId);
+                        header.apiVersion(), request, nodeId);
             }
         }
         Send send = request.toSend(nodeId, header);

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 0dc073e..12ff9ce 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -697,11 +697,10 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
         if (generation == null)
             return RequestFuture.failure(new CommitFailedException());
 
-        OffsetCommitRequest.Builder builder =
-                new OffsetCommitRequest.Builder(this.groupId, offsetData).
-                        setGenerationId(generation.generationId).
-                        setMemberId(generation.memberId).
-                        
setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
+        OffsetCommitRequest.Builder builder = new 
OffsetCommitRequest.Builder(this.groupId, offsetData).
+                setGenerationId(generation.generationId).
+                setMemberId(generation.memberId).
+                setRetentionTime(OffsetCommitRequest.DEFAULT_RETENTION_TIME);
 
         log.trace("Sending OffsetCommit request with {} to coordinator {} for 
group {}", offsets, coordinator, groupId);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index e5c5cf6..5a9778c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -343,8 +343,9 @@ public class ConsumerNetworkClient implements Closeable {
                 iterator.remove();
                 for (ClientRequest request : requestEntry.getValue()) {
                     RequestFutureCompletionHandler handler = 
(RequestFutureCompletionHandler) request.callback();
-                    handler.onComplete(new 
ClientResponse(request.makeHeader(), request.callback(), request.destination(),
-                            request.createdTimeMs(), now, true, null, null));
+                    handler.onComplete(new 
ClientResponse(request.makeHeader(request.requestBuilder().desiredOrLatestVersion()),
+                            request.callback(), request.destination(), 
request.createdTimeMs(), now, true,
+                            null, null));
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 02f34e5..655c27b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -200,11 +200,11 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
                                 long fetchOffset = 
request.fetchData().get(partition).offset;
                                 FetchResponse.PartitionData fetchData = 
entry.getValue();
                                 completedFetches.add(new 
CompletedFetch(partition, fetchOffset, fetchData, metricAggregator,
-                                        request.version()));
+                                        resp.requestHeader().apiVersion()));
                             }
 
                             
sensors.fetchLatency.record(resp.requestLatencyMs());
-                            
sensors.fetchThrottleTimeSensor.record(response.getThrottleTime());
+                            
sensors.fetchThrottleTimeSensor.record(response.throttleTimeMs());
                         }
 
                         @Override
@@ -603,13 +603,12 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
      * @return A response which can be polled to obtain the corresponding 
timestamps and offsets.
      */
     private RequestFuture<Map<TopicPartition, OffsetData>> 
sendListOffsetRequest(final Node node,
-                                                                               
          final Map<TopicPartition, Long> timestampsToSearch,
-                                                                               
          boolean requireTimestamp) {
-        ListOffsetRequest.Builder builder = new 
ListOffsetRequest.Builder().setTargetTimes(timestampsToSearch);
-
-        // If we need a timestamp in the response, the minimum RPC version we 
can send is v1.
-        // Otherwise, v0 is OK.
-        builder.setMinVersion(requireTimestamp ? (short) 1 : (short) 0);
+                                                                               
  final Map<TopicPartition, Long> timestampsToSearch,
+                                                                               
  boolean requireTimestamp) {
+        // If we need a timestamp in the response, the minimum RPC version we 
can send is v1. Otherwise, v0 is OK.
+        short minVersion = requireTimestamp ? (short) 1 : (short) 0;
+        ListOffsetRequest.Builder builder = 
ListOffsetRequest.Builder.forConsumer(minVersion)
+                .setTargetTimes(timestampsToSearch);
 
         log.trace("Sending ListOffsetRequest {} to broker {}", builder, node);
         return client.send(node, builder)
@@ -733,7 +732,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener {
         Map<Node, FetchRequest.Builder> requests = new HashMap<>();
         for (Map.Entry<Node, LinkedHashMap<TopicPartition, 
FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
             Node node = entry.getKey();
-            FetchRequest.Builder fetch = new 
FetchRequest.Builder(this.maxWaitMs, this.minBytes, entry.getValue()).
+            FetchRequest.Builder fetch = 
FetchRequest.Builder.forConsumer(this.maxWaitMs, this.minBytes, 
entry.getValue()).
                     setMaxBytes(this.maxBytes);
             requests.put(node, fetch);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index f2ea420..eea1916 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -25,40 +25,39 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public abstract class AbstractRequest extends AbstractRequestResponse {
-    private final short version;
 
     public static abstract class Builder<T extends AbstractRequest> {
         private final ApiKeys apiKey;
-        private short version;
+        private final Short desiredVersion;
 
         public Builder(ApiKeys apiKey) {
+            this(apiKey, null);
+        }
+
+        public Builder(ApiKeys apiKey, Short desiredVersion) {
             this.apiKey = apiKey;
-            this.version = ProtoUtils.latestVersion(apiKey.id);
+            this.desiredVersion = desiredVersion;
         }
 
         public ApiKeys apiKey() {
             return apiKey;
         }
 
-        public Builder<T> setVersion(short version) {
-            this.version = version;
-            return this;
+        public short desiredOrLatestVersion() {
+            return desiredVersion == null ? 
ProtoUtils.latestVersion(apiKey.id) : desiredVersion;
         }
 
-        public short version() {
-            return version;
+        public T build() {
+            return build(desiredOrLatestVersion());
         }
 
-        public abstract T build();
+        public abstract T build(short version);
     }
 
-    public AbstractRequest(Struct struct, short version) {
-        super(struct);
-        this.version = version;
-    }
+    private final short version;
 
-    public Send toSend(String destination, RequestHeader header) {
-        return new NetworkSend(destination, serialize(header, this));
+    public AbstractRequest(short version) {
+        this.version = version;
     }
 
     /**
@@ -68,6 +67,19 @@ public abstract class AbstractRequest extends 
AbstractRequestResponse {
         return version;
     }
 
+    public Send toSend(String destination, RequestHeader header) {
+        return new NetworkSend(destination, serialize(header));
+    }
+
+    /**
+     * Use with care, typically {@link #toSend(String, RequestHeader)} should 
be used instead.
+     */
+    public ByteBuffer serialize(RequestHeader header) {
+        return serialize(header.toStruct(), toStruct());
+    }
+
+    protected abstract Struct toStruct();
+
     /**
      * Get an error response for a request
      */
@@ -76,54 +88,78 @@ public abstract class AbstractRequest extends 
AbstractRequestResponse {
     /**
      * Factory method for getting a request object based on ApiKey ID and a 
buffer
      */
-    public static AbstractRequest getRequest(int requestId, short versionId, 
ByteBuffer buffer) {
+    public static RequestAndSize getRequest(int requestId, short version, 
ByteBuffer buffer) {
         ApiKeys apiKey = ApiKeys.forId(requestId);
+        Struct struct = ProtoUtils.parseRequest(apiKey.id, version, buffer);
+        AbstractRequest request;
         switch (apiKey) {
             case PRODUCE:
-                return ProduceRequest.parse(buffer, versionId);
+                request = new ProduceRequest(struct, version);
+                break;
             case FETCH:
-                return FetchRequest.parse(buffer, versionId);
+                request = new FetchRequest(struct, version);
+                break;
             case LIST_OFFSETS:
-                return ListOffsetRequest.parse(buffer, versionId);
+                request = new ListOffsetRequest(struct, version);
+                break;
             case METADATA:
-                return MetadataRequest.parse(buffer, versionId);
+                request = new MetadataRequest(struct, version);
+                break;
             case OFFSET_COMMIT:
-                return OffsetCommitRequest.parse(buffer, versionId);
+                request = new OffsetCommitRequest(struct, version);
+                break;
             case OFFSET_FETCH:
-                return OffsetFetchRequest.parse(buffer, versionId);
+                request = new OffsetFetchRequest(struct, version);
+                break;
             case GROUP_COORDINATOR:
-                return GroupCoordinatorRequest.parse(buffer, versionId);
+                request = new GroupCoordinatorRequest(struct, version);
+                break;
             case JOIN_GROUP:
-                return JoinGroupRequest.parse(buffer, versionId);
+                request = new JoinGroupRequest(struct, version);
+                break;
             case HEARTBEAT:
-                return HeartbeatRequest.parse(buffer, versionId);
+                request = new HeartbeatRequest(struct, version);
+                break;
             case LEAVE_GROUP:
-                return LeaveGroupRequest.parse(buffer, versionId);
+                request = new LeaveGroupRequest(struct, version);
+                break;
             case SYNC_GROUP:
-                return SyncGroupRequest.parse(buffer, versionId);
+                request = new SyncGroupRequest(struct, version);
+                break;
             case STOP_REPLICA:
-                return StopReplicaRequest.parse(buffer, versionId);
+                request = new StopReplicaRequest(struct, version);
+                break;
             case CONTROLLED_SHUTDOWN_KEY:
-                return ControlledShutdownRequest.parse(buffer, versionId);
+                request = new ControlledShutdownRequest(struct, version);
+                break;
             case UPDATE_METADATA_KEY:
-                return UpdateMetadataRequest.parse(buffer, versionId);
+                request = new UpdateMetadataRequest(struct, version);
+                break;
             case LEADER_AND_ISR:
-                return LeaderAndIsrRequest.parse(buffer, versionId);
+                request = new LeaderAndIsrRequest(struct, version);
+                break;
             case DESCRIBE_GROUPS:
-                return DescribeGroupsRequest.parse(buffer, versionId);
+                request = new DescribeGroupsRequest(struct, version);
+                break;
             case LIST_GROUPS:
-                return ListGroupsRequest.parse(buffer, versionId);
+                request = new ListGroupsRequest(struct, version);
+                break;
             case SASL_HANDSHAKE:
-                return SaslHandshakeRequest.parse(buffer, versionId);
+                request = new SaslHandshakeRequest(struct, version);
+                break;
             case API_VERSIONS:
-                return ApiVersionsRequest.parse(buffer, versionId);
+                request = new ApiVersionsRequest(struct, version);
+                break;
             case CREATE_TOPICS:
-                return CreateTopicsRequest.parse(buffer, versionId);
+                request = new CreateTopicsRequest(struct, version);
+                break;
             case DELETE_TOPICS:
-                return DeleteTopicsRequest.parse(buffer, versionId);
+                request = new DeleteTopicsRequest(struct, version);
+                break;
             default:
                 throw new AssertionError(String.format("ApiKey %s is not 
currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));
         }
+        return new RequestAndSize(request, struct.sizeOf());
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
index 3ad16a5..00ddf71 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
@@ -17,56 +17,13 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public abstract class AbstractRequestResponse {
-    protected final Struct struct;
-
-    public AbstractRequestResponse(Struct struct) {
-        this.struct = struct;
-    }
-
-    public Struct toStruct() {
-        return struct;
-    }
-
-    /**
-     * Get the serialized size of this object
-     */
-    public int sizeOf() {
-        return struct.sizeOf();
-    }
-
     /**
-     * Write this object to a buffer
+     * Visible for testing.
      */
-    public void writeTo(ByteBuffer buffer) {
-        struct.writeTo(buffer);
-    }
-
-    @Override
-    public String toString() {
-        return struct.toString();
-    }
-
-    @Override
-    public int hashCode() {
-        return struct.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        AbstractRequestResponse other = (AbstractRequestResponse) obj;
-        return struct.equals(other.struct);
-    }
-
-    public static ByteBuffer serialize(AbstractRequestResponse header, 
AbstractRequestResponse body) {
-        ByteBuffer buffer = ByteBuffer.allocate(header.sizeOf() + 
body.sizeOf());
-        header.writeTo(buffer);
-        body.writeTo(buffer);
+    public static ByteBuffer serialize(Struct headerStruct, Struct bodyStruct) 
{
+        ByteBuffer buffer = ByteBuffer.allocate(headerStruct.sizeOf() + 
bodyStruct.sizeOf());
+        headerStruct.writeTo(buffer);
+        bodyStruct.writeTo(buffer);
         buffer.rewind();
         return buffer;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 8bbc25a..a21e340 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -21,17 +21,32 @@ import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.types.Struct;
 
+import java.nio.ByteBuffer;
+
 public abstract class AbstractResponse extends AbstractRequestResponse {
 
-    public AbstractResponse(Struct struct) {
-        super(struct);
+    public Send toSend(String destination, RequestHeader requestHeader) {
+        return toSend(destination, requestHeader.apiVersion(), 
requestHeader.toResponseHeader());
+    }
+
+    /**
+     * This should only be used if we need to return a response with a 
different version than the request, which
+     * should be very rare (an example is @link {@link 
ApiVersionsResponse#unsupportedVersionSend(String, RequestHeader)}).
+     * Typically {@link #toSend(String, RequestHeader)} should be used.
+     */
+    public Send toSend(String destination, short version, ResponseHeader 
responseHeader) {
+        return new NetworkSend(destination, serialize(version, 
responseHeader));
     }
 
-    public Send toSend(String destination, RequestHeader request) {
-        ResponseHeader responseHeader = new 
ResponseHeader(request.correlationId());
-        return new NetworkSend(destination, serialize(responseHeader, this));
+    /**
+     * Visible for testing, typically {@link #toSend(String, RequestHeader)} 
should be used instead.
+     */
+    public ByteBuffer serialize(short version, ResponseHeader responseHeader) {
+        return serialize(responseHeader.toStruct(), toStruct(version));
     }
 
+    protected abstract Struct toStruct(short version);
+
     public static AbstractResponse getResponse(int requestId, Struct struct) {
         ApiKeys apiKey = ApiKeys.forId(requestId);
         switch (apiKey) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
index fe7c348..7d40900 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java
@@ -22,13 +22,14 @@ import java.util.Collections;
 
 public class ApiVersionsRequest extends AbstractRequest {
     public static class Builder extends 
AbstractRequest.Builder<ApiVersionsRequest> {
+
         public Builder() {
             super(ApiKeys.API_VERSIONS);
         }
 
         @Override
-        public ApiVersionsRequest build() {
-            return new ApiVersionsRequest(version());
+        public ApiVersionsRequest build(short version) {
+            return new ApiVersionsRequest(version);
         }
 
         @Override
@@ -38,12 +39,16 @@ public class ApiVersionsRequest extends AbstractRequest {
     }
 
     public ApiVersionsRequest(short version) {
-        this(new Struct(ProtoUtils.requestSchema(ApiKeys.API_VERSIONS.id, 
version)),
-                version);
+        super(version);
     }
 
-    public ApiVersionsRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+    public ApiVersionsRequest(Struct struct, short version) {
+        super(version);
+    }
+
+    @Override
+    protected Struct toStruct() {
+        return new Struct(ProtoUtils.requestSchema(ApiKeys.API_VERSIONS.id, 
version()));
     }
 
     @Override
@@ -58,13 +63,8 @@ public class ApiVersionsRequest extends AbstractRequest {
         }
     }
 
-    public static ApiVersionsRequest parse(ByteBuffer buffer, int versionId) {
-        return new ApiVersionsRequest(
-                ProtoUtils.parseRequest(ApiKeys.API_VERSIONS.id, versionId, 
buffer),
-                (short) versionId);
+    public static ApiVersionsRequest parse(ByteBuffer buffer, short version) {
+        return new 
ApiVersionsRequest(ProtoUtils.parseRequest(ApiKeys.API_VERSIONS.id, version, 
buffer), version);
     }
 
-    public static ApiVersionsRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.API_VERSIONS.id));
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index 7d8bcc5..0066c08 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -12,10 +12,10 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -28,8 +28,6 @@ import java.util.Map;
 
 public class ApiVersionsResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.API_VERSIONS.id);
-
     public static final ApiVersionsResponse API_VERSIONS_RESPONSE = 
createApiVersionsResponse();
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String API_VERSIONS_KEY_NAME = "api_versions";
@@ -67,23 +65,11 @@ public class ApiVersionsResponse extends AbstractResponse {
     }
 
     public ApiVersionsResponse(Errors error, List<ApiVersion> apiVersions) {
-        super(new Struct(CURRENT_SCHEMA));
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-        List<Struct> apiVersionList = new ArrayList<>();
-        for (ApiVersion apiVersion : apiVersions) {
-            Struct apiVersionStruct = struct.instance(API_VERSIONS_KEY_NAME);
-            apiVersionStruct.set(API_KEY_NAME, apiVersion.apiKey);
-            apiVersionStruct.set(MIN_VERSION_KEY_NAME, apiVersion.minVersion);
-            apiVersionStruct.set(MAX_VERSION_KEY_NAME, apiVersion.maxVersion);
-            apiVersionList.add(apiVersionStruct);
-        }
-        struct.set(API_VERSIONS_KEY_NAME, apiVersionList.toArray());
         this.error = error;
         this.apiKeyToApiVersion = buildApiKeyToApiVersion(apiVersions);
     }
 
     public ApiVersionsResponse(Struct struct) {
-        super(struct);
         this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         List<ApiVersion> tempApiVersions = new ArrayList<>();
         for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) {
@@ -96,6 +82,31 @@ public class ApiVersionsResponse extends AbstractResponse {
         this.apiKeyToApiVersion = buildApiKeyToApiVersion(tempApiVersions);
     }
 
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.API_VERSIONS.id, version));
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        List<Struct> apiVersionList = new ArrayList<>();
+        for (ApiVersion apiVersion : apiKeyToApiVersion.values()) {
+            Struct apiVersionStruct = struct.instance(API_VERSIONS_KEY_NAME);
+            apiVersionStruct.set(API_KEY_NAME, apiVersion.apiKey);
+            apiVersionStruct.set(MIN_VERSION_KEY_NAME, apiVersion.minVersion);
+            apiVersionStruct.set(MAX_VERSION_KEY_NAME, apiVersion.maxVersion);
+            apiVersionList.add(apiVersionStruct);
+        }
+        struct.set(API_VERSIONS_KEY_NAME, apiVersionList.toArray());
+        return struct;
+    }
+
+    /**
+     * Returns Errors.UNSUPPORTED_VERSION response with version 0 since we 
don't support the requested version.
+     */
+    public static Send unsupportedVersionSend(String destination, 
RequestHeader requestHeader) {
+        ApiVersionsResponse response = new 
ApiVersionsResponse(Errors.UNSUPPORTED_VERSION,
+                Collections.<ApiVersion>emptyList());
+        return response.toSend(destination, (short) 0, 
requestHeader.toResponseHeader());
+    }
+
     public Collection<ApiVersion> apiVersions() {
         return apiKeyToApiVersion.values();
     }
@@ -108,12 +119,8 @@ public class ApiVersionsResponse extends AbstractResponse {
         return error;
     }
 
-    public static ApiVersionsResponse parse(ByteBuffer buffer) {
-        return new ApiVersionsResponse(CURRENT_SCHEMA.read(buffer));
-    }
-
-    public static ApiVersionsResponse fromError(Errors error) {
-        return new ApiVersionsResponse(error, 
Collections.<ApiVersion>emptyList());
+    public static ApiVersionsResponse parse(ByteBuffer buffer, short version) {
+        return new 
ApiVersionsResponse(ProtoUtils.responseSchema(ApiKeys.API_VERSIONS.id, 
version).read(buffer));
     }
 
     private static ApiVersionsResponse createApiVersionsResponse() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
index 8f44e5c..679e5dd 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java
@@ -33,8 +33,8 @@ public class ControlledShutdownRequest extends 
AbstractRequest {
         }
 
         @Override
-        public ControlledShutdownRequest build() {
-            return new ControlledShutdownRequest(brokerId, version());
+        public ControlledShutdownRequest build(short version) {
+            return new ControlledShutdownRequest(brokerId, version);
         }
 
         @Override
@@ -49,14 +49,12 @@ public class ControlledShutdownRequest extends 
AbstractRequest {
     private int brokerId;
 
     private ControlledShutdownRequest(int brokerId, short version) {
-        super(new 
Struct(ProtoUtils.requestSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, version)),
-                version);
-        struct.set(BROKER_ID_KEY_NAME, brokerId);
+        super(version);
         this.brokerId = brokerId;
     }
 
-    public ControlledShutdownRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+    public ControlledShutdownRequest(Struct struct, short version) {
+        super(version);
         brokerId = struct.getInt(BROKER_ID_KEY_NAME);
     }
 
@@ -79,12 +77,15 @@ public class ControlledShutdownRequest extends 
AbstractRequest {
         return brokerId;
     }
 
-    public static ControlledShutdownRequest parse(ByteBuffer buffer, int 
versionId) {
+    public static ControlledShutdownRequest parse(ByteBuffer buffer, short 
versionId) {
         return new ControlledShutdownRequest(
-                ProtoUtils.parseRequest(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, 
versionId, buffer), (short) versionId);
+                ProtoUtils.parseRequest(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, 
versionId, buffer), versionId);
     }
 
-    public static ControlledShutdownRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id));
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, version()));
+        struct.set(BROKER_ID_KEY_NAME, brokerId);
+        return struct;
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
index b3922f9..a2cac6c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownResponse.java
@@ -16,7 +16,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,8 +26,6 @@ import java.util.Set;
 
 public class ControlledShutdownResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
-
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String PARTITIONS_REMAINING_KEY_NAME = 
"partitions_remaining";
 
@@ -47,25 +44,11 @@ public class ControlledShutdownResponse extends 
AbstractResponse {
     private final Set<TopicPartition> partitionsRemaining;
 
     public ControlledShutdownResponse(Errors error, Set<TopicPartition> 
partitionsRemaining) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        struct.set(ERROR_CODE_KEY_NAME, error.code());
-
-        List<Struct> partitionsRemainingList = new 
ArrayList<>(partitionsRemaining.size());
-        for (TopicPartition topicPartition : partitionsRemaining) {
-            Struct topicPartitionStruct = 
struct.instance(PARTITIONS_REMAINING_KEY_NAME);
-            topicPartitionStruct.set(TOPIC_KEY_NAME, topicPartition.topic());
-            topicPartitionStruct.set(PARTITION_KEY_NAME, 
topicPartition.partition());
-            partitionsRemainingList.add(topicPartitionStruct);
-        }
-        struct.set(PARTITIONS_REMAINING_KEY_NAME, 
partitionsRemainingList.toArray());
-
         this.error = error;
         this.partitionsRemaining = partitionsRemaining;
     }
 
     public ControlledShutdownResponse(Struct struct) {
-        super(struct);
         error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
         Set<TopicPartition> partitions = new HashSet<>();
         for (Object topicPartitionObj : 
struct.getArray(PARTITIONS_REMAINING_KEY_NAME)) {
@@ -85,12 +68,25 @@ public class ControlledShutdownResponse extends 
AbstractResponse {
         return partitionsRemaining;
     }
 
-    public static ControlledShutdownResponse parse(ByteBuffer buffer) {
-        return new ControlledShutdownResponse(CURRENT_SCHEMA.read(buffer));
-    }
-
-    public static ControlledShutdownResponse parse(ByteBuffer buffer, int 
version) {
+    public static ControlledShutdownResponse parse(ByteBuffer buffer, short 
version) {
         return new 
ControlledShutdownResponse(ProtoUtils.parseResponse(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id,
 version, buffer));
     }
 
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id, version));
+
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+
+        List<Struct> partitionsRemainingList = new 
ArrayList<>(partitionsRemaining.size());
+        for (TopicPartition topicPartition : partitionsRemaining) {
+            Struct topicPartitionStruct = 
struct.instance(PARTITIONS_REMAINING_KEY_NAME);
+            topicPartitionStruct.set(TOPIC_KEY_NAME, topicPartition.topic());
+            topicPartitionStruct.set(PARTITION_KEY_NAME, 
topicPartition.partition());
+            partitionsRemainingList.add(topicPartitionStruct);
+        }
+        struct.set(PARTITIONS_REMAINING_KEY_NAME, 
partitionsRemainingList.toArray());
+
+        return struct;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index 0a4bce0..a8f8c5e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -113,11 +113,11 @@ public class CreateTopicsRequest extends AbstractRequest {
         }
 
         @Override
-        public CreateTopicsRequest build() {
-            if (validateOnly && version() == 0)
+        public CreateTopicsRequest build(short version) {
+            if (validateOnly && version == 0)
                 throw new UnsupportedVersionException("validateOnly is not 
supported in version 0 of " +
                         "CreateTopicsRequest");
-            return new CreateTopicsRequest(topics, timeout, validateOnly, 
version());
+            return new CreateTopicsRequest(topics, timeout, validateOnly, 
version);
         }
 
         @Override
@@ -144,53 +144,15 @@ public class CreateTopicsRequest extends AbstractRequest {
     public static final short NO_REPLICATION_FACTOR = -1;
 
     private CreateTopicsRequest(Map<String, TopicDetails> topics, Integer 
timeout, boolean validateOnly, short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.CREATE_TOPICS.id, 
version)), version);
-
-        List<Struct> createTopicRequestStructs = new 
ArrayList<>(topics.size());
-        for (Map.Entry<String, TopicDetails> entry : topics.entrySet()) {
-
-            Struct singleRequestStruct = struct.instance(REQUESTS_KEY_NAME);
-            String topic = entry.getKey();
-            TopicDetails args = entry.getValue();
-
-            singleRequestStruct.set(TOPIC_KEY_NAME, topic);
-            singleRequestStruct.set(NUM_PARTITIONS_KEY_NAME, 
args.numPartitions);
-            singleRequestStruct.set(REPLICATION_FACTOR_KEY_NAME, 
args.replicationFactor);
-
-            // replica assignment
-            List<Struct> replicaAssignmentsStructs = new 
ArrayList<>(args.replicasAssignments.size());
-            for (Map.Entry<Integer, List<Integer>> partitionReplicaAssignment 
: args.replicasAssignments.entrySet()) {
-                Struct replicaAssignmentStruct = 
singleRequestStruct.instance(REPLICA_ASSIGNMENT_KEY_NAME);
-                
replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME, 
partitionReplicaAssignment.getKey());
-                
replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, 
partitionReplicaAssignment.getValue().toArray());
-                replicaAssignmentsStructs.add(replicaAssignmentStruct);
-            }
-            singleRequestStruct.set(REPLICA_ASSIGNMENT_KEY_NAME, 
replicaAssignmentsStructs.toArray());
-
-            // configs
-            List<Struct> configsStructs = new ArrayList<>(args.configs.size());
-            for (Map.Entry<String, String> configEntry : 
args.configs.entrySet()) {
-                Struct configStruct = 
singleRequestStruct.instance(CONFIGS_KEY_NAME);
-                configStruct.set(CONFIG_KEY_KEY_NAME, configEntry.getKey());
-                configStruct.set(CONFIG_VALUE_KEY_NAME, 
configEntry.getValue());
-                configsStructs.add(configStruct);
-            }
-            singleRequestStruct.set(CONFIGS_KEY_NAME, 
configsStructs.toArray());
-            createTopicRequestStructs.add(singleRequestStruct);
-        }
-        struct.set(REQUESTS_KEY_NAME, createTopicRequestStructs.toArray());
-        struct.set(TIMEOUT_KEY_NAME, timeout);
-        if (version >= 1)
-            struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly);
-
+        super(version);
         this.topics = topics;
         this.timeout = timeout;
         this.validateOnly = validateOnly;
         this.duplicateTopics = Collections.emptySet();
     }
 
-    public CreateTopicsRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+    public CreateTopicsRequest(Struct struct, short version) {
+        super(version);
 
         Object[] requestStructs = struct.getArray(REQUESTS_KEY_NAME);
         Map<String, TopicDetails> topics = new HashMap<>();
@@ -262,7 +224,7 @@ public class CreateTopicsRequest extends AbstractRequest {
         switch (versionId) {
             case 0:
             case 1:
-                return new CreateTopicsResponse(topicErrors, versionId);
+                return new CreateTopicsResponse(topicErrors);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                     versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.CREATE_TOPICS.id)));
@@ -285,13 +247,55 @@ public class CreateTopicsRequest extends AbstractRequest {
         return this.duplicateTopics;
     }
 
-    public static CreateTopicsRequest parse(ByteBuffer buffer, int versionId) {
-        return new CreateTopicsRequest(
-                ProtoUtils.parseRequest(ApiKeys.CREATE_TOPICS.id, versionId, 
buffer),
-                (short) versionId);
+    public static CreateTopicsRequest parse(ByteBuffer buffer, short 
versionId) {
+        return new 
CreateTopicsRequest(ProtoUtils.parseRequest(ApiKeys.CREATE_TOPICS.id, 
versionId, buffer), versionId);
     }
 
-    public static CreateTopicsRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.CREATE_TOPICS.id));
+    /**
+     * Visible for testing.
+     */
+    @Override
+    public Struct toStruct() {
+        short version = version();
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.CREATE_TOPICS.id, version));
+
+        List<Struct> createTopicRequestStructs = new 
ArrayList<>(topics.size());
+        for (Map.Entry<String, TopicDetails> entry : topics.entrySet()) {
+
+            Struct singleRequestStruct = struct.instance(REQUESTS_KEY_NAME);
+            String topic = entry.getKey();
+            TopicDetails args = entry.getValue();
+
+            singleRequestStruct.set(TOPIC_KEY_NAME, topic);
+            singleRequestStruct.set(NUM_PARTITIONS_KEY_NAME, 
args.numPartitions);
+            singleRequestStruct.set(REPLICATION_FACTOR_KEY_NAME, 
args.replicationFactor);
+
+            // replica assignment
+            List<Struct> replicaAssignmentsStructs = new 
ArrayList<>(args.replicasAssignments.size());
+            for (Map.Entry<Integer, List<Integer>> partitionReplicaAssignment 
: args.replicasAssignments.entrySet()) {
+                Struct replicaAssignmentStruct = 
singleRequestStruct.instance(REPLICA_ASSIGNMENT_KEY_NAME);
+                
replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME, 
partitionReplicaAssignment.getKey());
+                
replicaAssignmentStruct.set(REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME, 
partitionReplicaAssignment.getValue().toArray());
+                replicaAssignmentsStructs.add(replicaAssignmentStruct);
+            }
+            singleRequestStruct.set(REPLICA_ASSIGNMENT_KEY_NAME, 
replicaAssignmentsStructs.toArray());
+
+            // configs
+            List<Struct> configsStructs = new ArrayList<>(args.configs.size());
+            for (Map.Entry<String, String> configEntry : 
args.configs.entrySet()) {
+                Struct configStruct = 
singleRequestStruct.instance(CONFIGS_KEY_NAME);
+                configStruct.set(CONFIG_KEY_KEY_NAME, configEntry.getKey());
+                configStruct.set(CONFIG_VALUE_KEY_NAME, 
configEntry.getValue());
+                configsStructs.add(configStruct);
+            }
+            singleRequestStruct.set(CONFIGS_KEY_NAME, 
configsStructs.toArray());
+            createTopicRequestStructs.add(singleRequestStruct);
+        }
+        struct.set(REQUESTS_KEY_NAME, createTopicRequestStructs.toArray());
+        struct.set(TIMEOUT_KEY_NAME, timeout);
+        if (version >= 1)
+            struct.set(VALIDATE_ONLY_KEY_NAME, validateOnly);
+        return struct;
+
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index b09795f..01b7c2b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -20,7 +20,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -30,8 +29,6 @@ import java.util.List;
 import java.util.Map;
 
 public class CreateTopicsResponse extends AbstractResponse {
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.CREATE_TOPICS.id);
-
     private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -87,27 +84,11 @@ public class CreateTopicsResponse extends AbstractResponse {
 
     private final Map<String, Error> errors;
 
-    public CreateTopicsResponse(Map<String, Error> errors, short version) {
-        super(new Struct(ProtoUtils.responseSchema(ApiKeys.CREATE_TOPICS.id, 
version)));
-
-        List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
-        for (Map.Entry<String, Error> topicError : errors.entrySet()) {
-            Struct topicErrorsStruct = struct.instance(TOPIC_ERRORS_KEY_NAME);
-            topicErrorsStruct.set(TOPIC_KEY_NAME, topicError.getKey());
-            Error error = topicError.getValue();
-            topicErrorsStruct.set(ERROR_CODE_KEY_NAME, error.error.code());
-            if (version >= 1)
-                topicErrorsStruct.set(ERROR_MESSAGE_KEY_NAME, error.message());
-            topicErrorsStructs.add(topicErrorsStruct);
-        }
-        struct.set(TOPIC_ERRORS_KEY_NAME, topicErrorsStructs.toArray());
-
+    public CreateTopicsResponse(Map<String, Error> errors) {
         this.errors = errors;
     }
 
     public CreateTopicsResponse(Struct struct) {
-        super(struct);
-
         Object[] topicErrorStructs = struct.getArray(TOPIC_ERRORS_KEY_NAME);
         Map<String, Error> errors = new HashMap<>();
         for (Object topicErrorStructObj : topicErrorStructs) {
@@ -123,15 +104,29 @@ public class CreateTopicsResponse extends 
AbstractResponse {
         this.errors = errors;
     }
 
-    public Map<String, Error> errors() {
-        return errors;
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.CREATE_TOPICS.id, version));
+
+        List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
+        for (Map.Entry<String, Error> topicError : errors.entrySet()) {
+            Struct topicErrorsStruct = struct.instance(TOPIC_ERRORS_KEY_NAME);
+            topicErrorsStruct.set(TOPIC_KEY_NAME, topicError.getKey());
+            Error error = topicError.getValue();
+            topicErrorsStruct.set(ERROR_CODE_KEY_NAME, error.error.code());
+            if (version >= 1)
+                topicErrorsStruct.set(ERROR_MESSAGE_KEY_NAME, error.message());
+            topicErrorsStructs.add(topicErrorsStruct);
+        }
+        struct.set(TOPIC_ERRORS_KEY_NAME, topicErrorsStructs.toArray());
+        return struct;
     }
 
-    public static CreateTopicsResponse parse(ByteBuffer buffer) {
-        return new CreateTopicsResponse(CURRENT_SCHEMA.read(buffer));
+    public Map<String, Error> errors() {
+        return errors;
     }
 
-    public static CreateTopicsResponse parse(ByteBuffer buffer, int version) {
+    public static CreateTopicsResponse parse(ByteBuffer buffer, short version) 
{
         return new 
CreateTopicsResponse(ProtoUtils.responseSchema(ApiKeys.CREATE_TOPICS.id, 
version).read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
index 2874fad..eea4aa9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
@@ -46,8 +46,8 @@ public class DeleteTopicsRequest extends AbstractRequest {
         }
 
         @Override
-        public DeleteTopicsRequest build() {
-            return new DeleteTopicsRequest(topics, timeout, version());
+        public DeleteTopicsRequest build(short version) {
+            return new DeleteTopicsRequest(topics, timeout, version);
         }
 
         @Override
@@ -62,17 +62,13 @@ public class DeleteTopicsRequest extends AbstractRequest {
     }
 
     private DeleteTopicsRequest(Set<String> topics, Integer timeout, short 
version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.DELETE_TOPICS.id, 
version)), version);
-
-        struct.set(TOPICS_KEY_NAME, topics.toArray());
-        struct.set(TIMEOUT_KEY_NAME, timeout);
-
+        super(version);
         this.topics = topics;
         this.timeout = timeout;
     }
 
     public DeleteTopicsRequest(Struct struct, short version) {
-        super(struct, version);
+        super(version);
         Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME);
         Set<String> topics = new HashSet<>(topicsArray.length);
         for (Object topic : topicsArray)
@@ -83,6 +79,14 @@ public class DeleteTopicsRequest extends AbstractRequest {
     }
 
     @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.DELETE_TOPICS.id, version()));
+        struct.set(TOPICS_KEY_NAME, topics.toArray());
+        struct.set(TIMEOUT_KEY_NAME, timeout);
+        return struct;
+    }
+
+    @Override
     public AbstractResponse getErrorResponse(Throwable e) {
         Map<String, Errors> topicErrors = new HashMap<>();
         for (String topic : topics)
@@ -105,12 +109,8 @@ public class DeleteTopicsRequest extends AbstractRequest {
         return this.timeout;
     }
 
-    public static DeleteTopicsRequest parse(ByteBuffer buffer, int versionId) {
-        return new 
DeleteTopicsRequest(ProtoUtils.parseRequest(ApiKeys.DELETE_TOPICS.id, 
versionId, buffer),
-                (short) versionId);
+    public static DeleteTopicsRequest parse(ByteBuffer buffer, short 
versionId) {
+        return new 
DeleteTopicsRequest(ProtoUtils.parseRequest(ApiKeys.DELETE_TOPICS.id, 
versionId, buffer), versionId);
     }
 
-    public static DeleteTopicsRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id));
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 5c8b3d5..c47d098 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -19,7 +19,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -29,7 +28,6 @@ import java.util.List;
 import java.util.Map;
 
 public class DeleteTopicsResponse extends AbstractResponse {
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.DELETE_TOPICS.id);
     private static final String TOPIC_ERROR_CODES_KEY_NAME = 
"topic_error_codes";
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -45,23 +43,10 @@ public class DeleteTopicsResponse extends AbstractResponse {
     private final Map<String, Errors> errors;
 
     public DeleteTopicsResponse(Map<String, Errors> errors) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
-        for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
-            Struct topicErrorCodeStruct = 
struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
-            topicErrorCodeStruct.set(TOPIC_KEY_NAME, topicError.getKey());
-            topicErrorCodeStruct.set(ERROR_CODE_KEY_NAME, 
topicError.getValue().code());
-            topicErrorCodeStructs.add(topicErrorCodeStruct);
-        }
-        struct.set(TOPIC_ERROR_CODES_KEY_NAME, 
topicErrorCodeStructs.toArray());
-
         this.errors = errors;
     }
 
     public DeleteTopicsResponse(Struct struct) {
-        super(struct);
-
         Object[] topicErrorCodesStructs = 
struct.getArray(TOPIC_ERROR_CODES_KEY_NAME);
         Map<String, Errors> errors = new HashMap<>();
         for (Object topicErrorCodeStructObj : topicErrorCodesStructs) {
@@ -74,15 +59,25 @@ public class DeleteTopicsResponse extends AbstractResponse {
         this.errors = errors;
     }
 
-    public Map<String, Errors> errors() {
-        return errors;
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.DELETE_TOPICS.id, version));
+        List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size());
+        for (Map.Entry<String, Errors> topicError : errors.entrySet()) {
+            Struct topicErrorCodeStruct = 
struct.instance(TOPIC_ERROR_CODES_KEY_NAME);
+            topicErrorCodeStruct.set(TOPIC_KEY_NAME, topicError.getKey());
+            topicErrorCodeStruct.set(ERROR_CODE_KEY_NAME, 
topicError.getValue().code());
+            topicErrorCodeStructs.add(topicErrorCodeStruct);
+        }
+        struct.set(TOPIC_ERROR_CODES_KEY_NAME, 
topicErrorCodeStructs.toArray());
+        return struct;
     }
 
-    public static DeleteTopicsResponse parse(ByteBuffer buffer) {
-        return new DeleteTopicsResponse(CURRENT_SCHEMA.read(buffer));
+    public Map<String, Errors> errors() {
+        return errors;
     }
 
-    public static DeleteTopicsResponse parse(ByteBuffer buffer, int version) {
+    public static DeleteTopicsResponse parse(ByteBuffer buffer, short version) 
{
         return new 
DeleteTopicsResponse(ProtoUtils.responseSchema(ApiKeys.DELETE_TOPICS.id, 
version).read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
index f17cdd9..0f13371 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java
@@ -34,8 +34,7 @@ public class DescribeGroupsRequest extends AbstractRequest {
         }
 
         @Override
-        public DescribeGroupsRequest build() {
-            short version = version();
+        public DescribeGroupsRequest build(short version) {
             return new DescribeGroupsRequest(this.groupIds, version);
         }
 
@@ -48,14 +47,12 @@ public class DescribeGroupsRequest extends AbstractRequest {
     private final List<String> groupIds;
 
     private DescribeGroupsRequest(List<String> groupIds, short version) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.DESCRIBE_GROUPS.id, 
version)),
-                version);
-        struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray());
+        super(version);
         this.groupIds = groupIds;
     }
 
     public DescribeGroupsRequest(Struct struct, short version) {
-        super(struct, version);
+        super(version);
         this.groupIds = new ArrayList<>();
         for (Object groupId : struct.getArray(GROUP_IDS_KEY_NAME))
             this.groupIds.add((String) groupId);
@@ -66,6 +63,13 @@ public class DescribeGroupsRequest extends AbstractRequest {
     }
 
     @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.DESCRIBE_GROUPS.id, version()));
+        struct.set(GROUP_IDS_KEY_NAME, groupIds.toArray());
+        return struct;
+    }
+
+    @Override
     public AbstractResponse getErrorResponse(Throwable e) {
         short versionId = version();
         switch (versionId) {
@@ -78,12 +82,7 @@ public class DescribeGroupsRequest extends AbstractRequest {
         }
     }
 
-    public static DescribeGroupsRequest parse(ByteBuffer buffer, int 
versionId) {
-        return new 
DescribeGroupsRequest(ProtoUtils.parseRequest(ApiKeys.DESCRIBE_GROUPS.id, 
versionId, buffer),
-                (short) versionId);
-    }
-
-    public static DescribeGroupsRequest parse(ByteBuffer buffer) {
-        return parse(buffer, 
ProtoUtils.latestVersion(ApiKeys.DESCRIBE_GROUPS.id));
+    public static DescribeGroupsRequest parse(ByteBuffer buffer, short 
versionId) {
+        return new 
DescribeGroupsRequest(ProtoUtils.parseRequest(ApiKeys.DESCRIBE_GROUPS.id, 
versionId, buffer), versionId);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index 56b387e..0dde987 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -15,7 +15,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.ProtoUtils;
-import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -27,8 +26,6 @@ import java.util.Map;
 
 public class DescribeGroupsResponse extends AbstractResponse {
 
-    private static final Schema CURRENT_SCHEMA = 
ProtoUtils.currentResponseSchema(ApiKeys.DESCRIBE_GROUPS.id);
-
     private static final String GROUPS_KEY_NAME = "groups";
 
     private static final String ERROR_CODE_KEY_NAME = "error_code";
@@ -60,36 +57,10 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
     private final Map<String, GroupMetadata> groups;
 
     public DescribeGroupsResponse(Map<String, GroupMetadata> groups) {
-        super(new Struct(CURRENT_SCHEMA));
-
-        List<Struct> groupStructs = new ArrayList<>();
-        for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
-            Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
-            GroupMetadata group = groupEntry.getValue();
-            groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey());
-            groupStruct.set(ERROR_CODE_KEY_NAME, group.error.code());
-            groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
-            groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
-            groupStruct.set(PROTOCOL_KEY_NAME, group.protocol);
-            List<Struct> membersList = new ArrayList<>();
-            for (GroupMember member : group.members) {
-                Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME);
-                memberStruct.set(MEMBER_ID_KEY_NAME, member.memberId);
-                memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId);
-                memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost);
-                memberStruct.set(MEMBER_METADATA_KEY_NAME, 
member.memberMetadata);
-                memberStruct.set(MEMBER_ASSIGNMENT_KEY_NAME, 
member.memberAssignment);
-                membersList.add(memberStruct);
-            }
-            groupStruct.set(MEMBERS_KEY_NAME, membersList.toArray());
-            groupStructs.add(groupStruct);
-        }
-        struct.set(GROUPS_KEY_NAME, groupStructs.toArray());
         this.groups = groups;
     }
 
     public DescribeGroupsResponse(Struct struct) {
-        super(struct);
         this.groups = new HashMap<>();
         for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) {
             Struct groupStruct = (Struct) groupObj;
@@ -209,10 +180,6 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
         }
     }
 
-    public static DescribeGroupsResponse parse(ByteBuffer buffer) {
-        return new DescribeGroupsResponse(CURRENT_SCHEMA.read(buffer));
-    }
-
     public static DescribeGroupsResponse fromError(Errors error, List<String> 
groupIds) {
         GroupMetadata errorMetadata = GroupMetadata.forError(error);
         Map<String, GroupMetadata> groups = new HashMap<>();
@@ -221,4 +188,38 @@ public class DescribeGroupsResponse extends 
AbstractResponse {
         return new DescribeGroupsResponse(groups);
     }
 
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.DESCRIBE_GROUPS.id, version));
+
+        List<Struct> groupStructs = new ArrayList<>();
+        for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) {
+            Struct groupStruct = struct.instance(GROUPS_KEY_NAME);
+            GroupMetadata group = groupEntry.getValue();
+            groupStruct.set(GROUP_ID_KEY_NAME, groupEntry.getKey());
+            groupStruct.set(ERROR_CODE_KEY_NAME, group.error.code());
+            groupStruct.set(GROUP_STATE_KEY_NAME, group.state);
+            groupStruct.set(PROTOCOL_TYPE_KEY_NAME, group.protocolType);
+            groupStruct.set(PROTOCOL_KEY_NAME, group.protocol);
+            List<Struct> membersList = new ArrayList<>();
+            for (GroupMember member : group.members) {
+                Struct memberStruct = groupStruct.instance(MEMBERS_KEY_NAME);
+                memberStruct.set(MEMBER_ID_KEY_NAME, member.memberId);
+                memberStruct.set(CLIENT_ID_KEY_NAME, member.clientId);
+                memberStruct.set(CLIENT_HOST_KEY_NAME, member.clientHost);
+                memberStruct.set(MEMBER_METADATA_KEY_NAME, 
member.memberMetadata);
+                memberStruct.set(MEMBER_ASSIGNMENT_KEY_NAME, 
member.memberAssignment);
+                membersList.add(memberStruct);
+            }
+            groupStruct.set(MEMBERS_KEY_NAME, membersList.toArray());
+            groupStructs.add(groupStruct);
+        }
+        struct.set(GROUPS_KEY_NAME, groupStructs.toArray());
+
+        return struct;
+    }
+
+    public static DescribeGroupsResponse parse(ByteBuffer buffer, short 
versionId) {
+        return new 
DescribeGroupsResponse(ProtoUtils.parseResponse(ApiKeys.DESCRIBE_GROUPS.id, 
versionId, buffer));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/fc1cfe47/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 4f270e1..55fd286 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -92,27 +92,32 @@ public class FetchRequest extends AbstractRequest {
     }
 
     public static class Builder extends AbstractRequest.Builder<FetchRequest> {
-        private int replicaId = CONSUMER_REPLICA_ID;
-        private int maxWait;
+        private final int maxWait;
         private final int minBytes;
+        private final int replicaId;
+        private final LinkedHashMap<TopicPartition, PartitionData> fetchData;
         private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
-        private LinkedHashMap<TopicPartition, PartitionData> fetchData;
 
-        public Builder(int maxWait, int minBytes, 
LinkedHashMap<TopicPartition, PartitionData> fetchData) {
-            super(ApiKeys.FETCH);
-            this.maxWait = maxWait;
-            this.minBytes = minBytes;
-            this.fetchData = fetchData;
+        public static Builder forConsumer(int maxWait, int minBytes, 
LinkedHashMap<TopicPartition, PartitionData> fetchData) {
+            return new Builder(null, CONSUMER_REPLICA_ID, maxWait, minBytes, 
fetchData);
         }
 
-        public Builder setReplicaId(int replicaId) {
-            this.replicaId = replicaId;
-            return this;
+        public static Builder forReplica(short desiredVersion, int replicaId, 
int maxWait, int minBytes,
+                                         LinkedHashMap<TopicPartition, 
PartitionData> fetchData) {
+            return new Builder(desiredVersion, replicaId, maxWait, minBytes, 
fetchData);
         }
 
-        public Builder setMaxWait(int maxWait) {
+        private Builder(Short desiredVersion, int replicaId, int maxWait, int 
minBytes,
+                        LinkedHashMap<TopicPartition, PartitionData> 
fetchData) {
+            super(ApiKeys.FETCH, desiredVersion);
+            this.replicaId = replicaId;
             this.maxWait = maxWait;
-            return this;
+            this.minBytes = minBytes;
+            this.fetchData = fetchData;
+        }
+
+        public LinkedHashMap<TopicPartition, PartitionData> fetchData() {
+            return this.fetchData;
         }
 
         public Builder setMaxBytes(int maxBytes) {
@@ -120,19 +125,13 @@ public class FetchRequest extends AbstractRequest {
             return this;
         }
 
-        public LinkedHashMap<TopicPartition, PartitionData> fetchData() {
-            return this.fetchData;
-        }
-
         @Override
-        public FetchRequest build() {
-            short version = version();
+        public FetchRequest build(short version) {
             if (version < 3) {
                 maxBytes = -1;
             }
 
-            return new FetchRequest(version, replicaId, maxWait, minBytes,
-                    maxBytes, fetchData);
+            return new FetchRequest(version, replicaId, maxWait, minBytes, 
maxBytes, fetchData);
         }
 
         @Override
@@ -151,31 +150,7 @@ public class FetchRequest extends AbstractRequest {
 
     private FetchRequest(short version, int replicaId, int maxWait, int 
minBytes, int maxBytes,
                          LinkedHashMap<TopicPartition, PartitionData> 
fetchData) {
-        super(new Struct(ProtoUtils.requestSchema(ApiKeys.FETCH.id, version)), 
version);
-        List<TopicAndPartitionData<PartitionData>> topicsData = 
TopicAndPartitionData.batchByTopic(fetchData);
-
-        struct.set(REPLICA_ID_KEY_NAME, replicaId);
-        struct.set(MAX_WAIT_KEY_NAME, maxWait);
-        struct.set(MIN_BYTES_KEY_NAME, minBytes);
-        if (version >= 3)
-            struct.set(MAX_BYTES_KEY_NAME, maxBytes);
-        List<Struct> topicArray = new ArrayList<>();
-        for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
-            Struct topicData = struct.instance(TOPICS_KEY_NAME);
-            topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
-            List<Struct> partitionArray = new ArrayList<>();
-            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.partitions.entrySet()) {
-                PartitionData fetchPartitionData = partitionEntry.getValue();
-                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
-                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
-                partitionData.set(FETCH_OFFSET_KEY_NAME, 
fetchPartitionData.offset);
-                partitionData.set(MAX_BYTES_KEY_NAME, 
fetchPartitionData.maxBytes);
-                partitionArray.add(partitionData);
-            }
-            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
-            topicArray.add(topicData);
-        }
-        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        super(version);
         this.replicaId = replicaId;
         this.maxWait = maxWait;
         this.minBytes = minBytes;
@@ -183,8 +158,8 @@ public class FetchRequest extends AbstractRequest {
         this.fetchData = fetchData;
     }
 
-    public FetchRequest(Struct struct, short versionId) {
-        super(struct, versionId);
+    public FetchRequest(Struct struct, short version) {
+        super(version);
         replicaId = struct.getInt(REPLICA_ID_KEY_NAME);
         maxWait = struct.getInt(MAX_WAIT_KEY_NAME);
         minBytes = struct.getInt(MIN_BYTES_KEY_NAME);
@@ -217,8 +192,7 @@ public class FetchRequest extends AbstractRequest {
 
             responseData.put(entry.getKey(), partitionResponse);
         }
-        short versionId = version();
-        return new FetchResponse(versionId, responseData, 0);
+        return new FetchResponse(responseData, 0);
     }
 
     public int replicaId() {
@@ -245,11 +219,38 @@ public class FetchRequest extends AbstractRequest {
         return replicaId >= 0;
     }
 
-    public static FetchRequest parse(ByteBuffer buffer, int versionId) {
-        return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, 
versionId, buffer), (short) versionId);
+    public static FetchRequest parse(ByteBuffer buffer, short versionId) {
+        return new FetchRequest(ProtoUtils.parseRequest(ApiKeys.FETCH.id, 
versionId, buffer), versionId);
     }
 
-    public static FetchRequest parse(ByteBuffer buffer) {
-        return parse(buffer, ProtoUtils.latestVersion(ApiKeys.FETCH.id));
+    @Override
+    protected Struct toStruct() {
+        short version = version();
+        Struct struct = new Struct(ProtoUtils.requestSchema(ApiKeys.FETCH.id, 
version));
+        List<TopicAndPartitionData<PartitionData>> topicsData = 
TopicAndPartitionData.batchByTopic(fetchData);
+
+        struct.set(REPLICA_ID_KEY_NAME, replicaId);
+        struct.set(MAX_WAIT_KEY_NAME, maxWait);
+        struct.set(MIN_BYTES_KEY_NAME, minBytes);
+        if (version >= 3)
+            struct.set(MAX_BYTES_KEY_NAME, maxBytes);
+        List<Struct> topicArray = new ArrayList<>();
+        for (TopicAndPartitionData<PartitionData> topicEntry : topicsData) {
+            Struct topicData = struct.instance(TOPICS_KEY_NAME);
+            topicData.set(TOPIC_KEY_NAME, topicEntry.topic);
+            List<Struct> partitionArray = new ArrayList<>();
+            for (Map.Entry<Integer, PartitionData> partitionEntry : 
topicEntry.partitions.entrySet()) {
+                PartitionData fetchPartitionData = partitionEntry.getValue();
+                Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
+                partitionData.set(PARTITION_KEY_NAME, partitionEntry.getKey());
+                partitionData.set(FETCH_OFFSET_KEY_NAME, 
fetchPartitionData.offset);
+                partitionData.set(MAX_BYTES_KEY_NAME, 
fetchPartitionData.maxBytes);
+                partitionArray.add(partitionData);
+            }
+            topicData.set(PARTITIONS_KEY_NAME, partitionArray.toArray());
+            topicArray.add(topicData);
+        }
+        struct.set(TOPICS_KEY_NAME, topicArray.toArray());
+        return struct;
     }
 }

Reply via email to