Repository: kafka
Updated Branches:
  refs/heads/trunk 5916ef022 -> 015f1d738


http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index 94de4b1..189575e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -23,7 +23,6 @@ import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.CollectionUtils;
 
@@ -138,12 +137,12 @@ public class OffsetFetchResponse extends AbstractResponse 
{
     }
 
     public static OffsetFetchResponse parse(ByteBuffer buffer, short version) {
-        return new 
OffsetFetchResponse(ProtoUtils.parseResponse(ApiKeys.OFFSET_FETCH.id, version, 
buffer));
+        return new 
OffsetFetchResponse(ApiKeys.OFFSET_FETCH.parseResponse(version, buffer));
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.OFFSET_FETCH.id, version));
+        Struct struct = new 
Struct(ApiKeys.OFFSET_FETCH.responseSchema(version));
 
         Map<String, Map<Integer, PartitionData>> topicsData = 
CollectionUtils.groupDataByTopic(responseData);
         List<Struct> topicArray = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index df70e20..7b454e8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -17,7 +17,6 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.utils.CollectionUtils;
@@ -107,7 +106,7 @@ public class ProduceRequest extends AbstractRequest {
      */
     @Override
     public Struct toStruct() {
-        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, version()));
+        Struct struct = new Struct(ApiKeys.PRODUCE.requestSchema(version()));
         Map<String, Map<Integer, MemoryRecords>> recordsByTopic = 
CollectionUtils.groupDataByTopic(partitionRecords);
         struct.set(ACKS_KEY_NAME, acks);
         struct.set(TIMEOUT_KEY_NAME, timeout);
@@ -150,7 +149,7 @@ public class ProduceRequest extends AbstractRequest {
                 return new ProduceResponse(responseMap);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.PRODUCE.id)));
+                        versionId, this.getClass().getSimpleName(), 
ApiKeys.PRODUCE.latestVersion()));
         }
     }
 
@@ -170,7 +169,7 @@ public class ProduceRequest extends AbstractRequest {
         partitionRecords.clear();
     }
 
-    public static ProduceRequest parse(ByteBuffer buffer, short versionId) {
-        return new ProduceRequest(ProtoUtils.parseRequest(ApiKeys.PRODUCE.id, 
versionId, buffer), versionId);
+    public static ProduceRequest parse(ByteBuffer buffer, short version) {
+        return new ProduceRequest(ApiKeys.PRODUCE.parseRequest(version, 
buffer), version);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 7a022af..8234732 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -15,7 +15,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.utils.CollectionUtils;
@@ -107,7 +106,7 @@ public class ProduceResponse extends AbstractResponse {
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, version));
+        Struct struct = new Struct(ApiKeys.PRODUCE.responseSchema(version));
 
         Map<String, Map<Integer, PartitionResponse>> responseByTopic = 
CollectionUtils.groupDataByTopic(responses);
         List<Struct> topicDatas = new ArrayList<>(responseByTopic.size());
@@ -174,6 +173,6 @@ public class ProduceResponse extends AbstractResponse {
     }
 
     public static ProduceResponse parse(ByteBuffer buffer, short version) {
-        return new 
ProduceResponse(ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 
version).read(buffer));
+        return new 
ProduceResponse(ApiKeys.PRODUCE.responseSchema(version).read(buffer));
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
index a1f3f0e..9cc3f1f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeRequest.java
@@ -24,7 +24,6 @@ import java.util.List;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 
 
@@ -44,7 +43,7 @@ public class SaslHandshakeRequest extends AbstractRequest {
     private final String mechanism;
 
     public SaslHandshakeRequest(String mechanism) {
-        super(ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id));
+        super(ApiKeys.SASL_HANDSHAKE.latestVersion());
         this.mechanism = mechanism;
     }
 
@@ -66,17 +65,17 @@ public class SaslHandshakeRequest extends AbstractRequest {
                 return new SaslHandshakeResponse(Errors.forException(e), 
enabledMechanisms);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id)));
+                        versionId, this.getClass().getSimpleName(), 
ApiKeys.SASL_HANDSHAKE.latestVersion()));
         }
     }
 
-    public static SaslHandshakeRequest parse(ByteBuffer buffer, short 
versionId) {
-        return new 
SaslHandshakeRequest(ProtoUtils.parseRequest(ApiKeys.SASL_HANDSHAKE.id, 
versionId, buffer), versionId);
+    public static SaslHandshakeRequest parse(ByteBuffer buffer, short version) 
{
+        return new 
SaslHandshakeRequest(ApiKeys.SASL_HANDSHAKE.parseRequest(version, buffer), 
version);
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.SASL_HANDSHAKE.id, version()));
+        Struct struct = new 
Struct(ApiKeys.SASL_HANDSHAKE.requestSchema(version()));
         struct.set(MECHANISM_KEY_NAME, mechanism);
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
index 9d38c6a..e1a4c87 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SaslHandshakeResponse.java
@@ -25,7 +25,6 @@ import java.util.List;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 
 
@@ -66,7 +65,7 @@ public class SaslHandshakeResponse extends AbstractResponse {
 
     @Override
     public Struct toStruct(short version) {
-        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.SASL_HANDSHAKE.id, version));
+        Struct struct = new 
Struct(ApiKeys.SASL_HANDSHAKE.responseSchema(version));
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         struct.set(ENABLED_MECHANISMS_KEY_NAME, enabledMechanisms.toArray());
         return struct;
@@ -77,7 +76,7 @@ public class SaslHandshakeResponse extends AbstractResponse {
     }
 
     public static SaslHandshakeResponse parse(ByteBuffer buffer, short 
version) {
-        return new 
SaslHandshakeResponse(ProtoUtils.parseResponse(ApiKeys.SASL_HANDSHAKE.id, 
version, buffer));
+        return new 
SaslHandshakeResponse(ApiKeys.SASL_HANDSHAKE.parseResponse(version, buffer));
     }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
index 91806f1..7b79cd8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaRequest.java
@@ -16,7 +16,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -113,7 +112,7 @@ public class StopReplicaRequest extends AbstractRequest {
                 return new StopReplicaResponse(Errors.NONE, responses);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.STOP_REPLICA.id)));
+                        versionId, this.getClass().getSimpleName(), 
ApiKeys.STOP_REPLICA.latestVersion()));
         }
     }
 
@@ -133,13 +132,13 @@ public class StopReplicaRequest extends AbstractRequest {
         return partitions;
     }
 
-    public static StopReplicaRequest parse(ByteBuffer buffer, short versionId) 
{
-        return new 
StopReplicaRequest(ProtoUtils.parseRequest(ApiKeys.STOP_REPLICA.id, versionId, 
buffer), versionId);
+    public static StopReplicaRequest parse(ByteBuffer buffer, short version) {
+        return new 
StopReplicaRequest(ApiKeys.STOP_REPLICA.parseRequest(version, buffer), version);
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.STOP_REPLICA.id, version()));
+        Struct struct = new 
Struct(ApiKeys.STOP_REPLICA.requestSchema(version()));
 
         struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
         struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
index 5ae5cc1..617b1c6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/StopReplicaResponse.java
@@ -16,7 +16,6 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -69,13 +68,13 @@ public class StopReplicaResponse extends AbstractResponse {
         return error;
     }
 
-    public static StopReplicaResponse parse(ByteBuffer buffer, short 
versionId) {
-        return new 
StopReplicaResponse(ProtoUtils.parseResponse(ApiKeys.STOP_REPLICA.id, 
versionId, buffer));
+    public static StopReplicaResponse parse(ByteBuffer buffer, short version) {
+        return new 
StopReplicaResponse(ApiKeys.STOP_REPLICA.parseResponse(version, buffer));
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.STOP_REPLICA.id, version));
+        Struct struct = new 
Struct(ApiKeys.STOP_REPLICA.responseSchema(version));
 
         List<Struct> responseDatas = new ArrayList<>(responses.size());
         for (Map.Entry<TopicPartition, Errors> response : 
responses.entrySet()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
index 7ad5c9a..b55ccff 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupRequest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
 
@@ -108,7 +107,7 @@ public class SyncGroupRequest extends AbstractRequest {
                         ByteBuffer.wrap(new byte[]{}));
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
-                        versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.SYNC_GROUP.id)));
+                        versionId, this.getClass().getSimpleName(), 
ApiKeys.SYNC_GROUP.latestVersion()));
         }
     }
 
@@ -128,13 +127,13 @@ public class SyncGroupRequest extends AbstractRequest {
         return memberId;
     }
 
-    public static SyncGroupRequest parse(ByteBuffer buffer, short versionId) {
-        return new 
SyncGroupRequest(ProtoUtils.parseRequest(ApiKeys.SYNC_GROUP.id, versionId, 
buffer), versionId);
+    public static SyncGroupRequest parse(ByteBuffer buffer, short version) {
+        return new SyncGroupRequest(ApiKeys.SYNC_GROUP.parseRequest(version, 
buffer), version);
     }
 
     @Override
     protected Struct toStruct() {
-        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.SYNC_GROUP.id, version()));
+        Struct struct = new 
Struct(ApiKeys.SYNC_GROUP.requestSchema(version()));
         struct.set(GROUP_ID_KEY_NAME, groupId);
         struct.set(GENERATION_ID_KEY_NAME, generationId);
         struct.set(MEMBER_ID_KEY_NAME, memberId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index ff198aa..148815c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -62,14 +61,14 @@ public class SyncGroupResponse extends AbstractResponse {
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.SYNC_GROUP.id, version));
+        Struct struct = new Struct(ApiKeys.SYNC_GROUP.responseSchema(version));
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         struct.set(MEMBER_ASSIGNMENT_KEY_NAME, memberState);
         return struct;
     }
 
     public static SyncGroupResponse parse(ByteBuffer buffer, short version) {
-        return new 
SyncGroupResponse(ProtoUtils.parseResponse(ApiKeys.SYNC_GROUP.id, version, 
buffer));
+        return new SyncGroupResponse(ApiKeys.SYNC_GROUP.parseResponse(version, 
buffer));
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
index 8dd852d..98bf83b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataRequest.java
@@ -18,7 +18,6 @@ import 
org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.utils.Utils;
@@ -226,7 +225,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         short version = version();
-        Struct struct = new 
Struct(ProtoUtils.requestSchema(ApiKeys.UPDATE_METADATA_KEY.id, version));
+        Struct struct = new 
Struct(ApiKeys.UPDATE_METADATA_KEY.requestSchema(version));
         struct.set(CONTROLLER_ID_KEY_NAME, controllerId);
         struct.set(CONTROLLER_EPOCH_KEY_NAME, controllerEpoch);
 
@@ -288,7 +287,7 @@ public class UpdateMetadataRequest extends AbstractRequest {
             return new UpdateMetadataResponse(Errors.forException(e));
         else
             throw new IllegalArgumentException(String.format("Version %d is 
not valid. Valid versions for %s are 0 to %d",
-                    versionId, this.getClass().getSimpleName(), 
ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)));
+                    versionId, this.getClass().getSimpleName(), 
ApiKeys.UPDATE_METADATA_KEY.latestVersion()));
     }
 
     public int controllerId() {
@@ -307,9 +306,8 @@ public class UpdateMetadataRequest extends AbstractRequest {
         return liveBrokers;
     }
 
-    public static UpdateMetadataRequest parse(ByteBuffer buffer, short 
versionId) {
-        return new 
UpdateMetadataRequest(ProtoUtils.parseRequest(ApiKeys.UPDATE_METADATA_KEY.id, 
versionId, buffer),
-                versionId);
+    public static UpdateMetadataRequest parse(ByteBuffer buffer, short 
version) {
+        return new 
UpdateMetadataRequest(ApiKeys.UPDATE_METADATA_KEY.parseRequest(version, 
buffer), version);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
index 0032fca..5b8b46d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/UpdateMetadataResponse.java
@@ -15,7 +15,6 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -44,12 +43,12 @@ public class UpdateMetadataResponse extends 
AbstractResponse {
     }
 
     public static UpdateMetadataResponse parse(ByteBuffer buffer, short 
version) {
-        return new 
UpdateMetadataResponse(ProtoUtils.parseResponse(ApiKeys.UPDATE_METADATA_KEY.id, 
version, buffer));
+        return new 
UpdateMetadataResponse(ApiKeys.UPDATE_METADATA_KEY.parseResponse(version, 
buffer));
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new 
Struct(ProtoUtils.responseSchema(ApiKeys.UPDATE_METADATA_KEY.id, version));
+        Struct struct = new 
Struct(ApiKeys.UPDATE_METADATA_KEY.responseSchema(version));
         struct.set(ERROR_CODE_KEY_NAME, error.code());
         return struct;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index c89cc24..eb6a259 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
@@ -134,7 +133,7 @@ public class NetworkClientTest {
         networkClient.poll(1, time.milliseconds());
         assertEquals(1, networkClient.inFlightRequestCount());
         ResponseHeader respHeader = new 
ResponseHeader(request.correlationId());
-        Struct resp = new 
Struct(ProtoUtils.currentResponseSchema(ApiKeys.PRODUCE.id));
+        Struct resp = new 
Struct(ApiKeys.PRODUCE.responseSchema(ApiKeys.PRODUCE.latestVersion()));
         resp.set("responses", new Object[0]);
         Struct responseHeaderStruct = respHeader.toStruct();
         int size = responseHeaderStruct.sizeOf() + resp.sizeOf();

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 53c47c8..9f0868f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.ApiVersionsResponse.ApiVersion;
 import org.junit.Assert;
@@ -64,8 +63,7 @@ public class NodeApiVersionsTest {
             } else if (apiKey == ApiKeys.DELETE_TOPICS) {
                 versionList.add(new ApiVersion(apiKey.id, (short) 10000, 
(short) 10001));
             } else {
-                versionList.add(new ApiVersion(apiKey.id,
-                        ProtoUtils.oldestVersion(apiKey.id), 
ProtoUtils.latestVersion(apiKey.id)));
+                versionList.add(new ApiVersion(apiKey));
             }
         }
         NodeApiVersions versions = new NodeApiVersions(versionList);
@@ -80,15 +78,15 @@ public class NodeApiVersionsTest {
             } else {
                 bld.append(apiKey.name).append("(").
                         append(apiKey.id).append("): ");
-                if (ProtoUtils.oldestVersion(apiKey.id) ==
-                        ProtoUtils.latestVersion(apiKey.id)) {
-                    bld.append(ProtoUtils.oldestVersion(apiKey.id));
+                if (apiKey.oldestVersion() ==
+                        apiKey.latestVersion()) {
+                    bld.append(apiKey.oldestVersion());
                 } else {
-                    bld.append(ProtoUtils.oldestVersion(apiKey.id)).
+                    bld.append(apiKey.oldestVersion()).
                             append(" to ").
-                            append(ProtoUtils.latestVersion(apiKey.id));
+                            append(apiKey.latestVersion());
                 }
-                bld.append(" [usable: 
").append(ProtoUtils.latestVersion(apiKey.id)).
+                bld.append(" [usable: ").append(apiKey.latestVersion()).
                         append("]");
             }
             prefix = ", ";
@@ -129,7 +127,7 @@ public class NodeApiVersionsTest {
         versionList.add(new ApiVersion((short) 100, (short) 0, (short) 1));
         NodeApiVersions versions =  new NodeApiVersions(versionList);
         for (ApiKeys apiKey: ApiKeys.values()) {
-            assertEquals(ProtoUtils.latestVersion(apiKey.id), 
versions.usableVersion(apiKey));
+            assertEquals(apiKey.latestVersion(), 
versions.usableVersion(apiKey));
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java 
b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
index f177ae6..8917a22 100644
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/protocol/ApiKeysTest.java
@@ -31,4 +31,9 @@ public class ApiKeysTest {
         ApiKeys.forId(10000);
     }
 
+    @Test(expected = IllegalArgumentException.class)
+    public void schemaVersionOutOfRange() {
+        ApiKeys.PRODUCE.requestSchema((short) 
Protocol.REQUESTS[ApiKeys.PRODUCE.id].length);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
deleted file mode 100644
index 440ca49..0000000
--- a/clients/src/test/java/org/apache/kafka/common/protocol/ProtoUtilsTest.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.protocol;
-
-import org.junit.Test;
-
-public class ProtoUtilsTest {
-    @Test(expected = IllegalArgumentException.class)
-    public void schemaVersionOutOfRange() {
-        ProtoUtils.requestSchema(ApiKeys.PRODUCE.id, 
Protocol.REQUESTS[ApiKeys.PRODUCE.id].length);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d0b9639..1367ba1 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -169,7 +168,7 @@ public class RequestResponseTest {
     }
 
     private void checkOlderFetchVersions() throws Exception {
-        int latestVersion = ProtoUtils.latestVersion(ApiKeys.FETCH.id);
+        int latestVersion = ApiKeys.FETCH.latestVersion();
         for (int i = 0; i < latestVersion; ++i) {
             checkErrorResponse(createFetchRequest(i), new 
UnknownServerException());
             checkRequest(createFetchRequest(i));
@@ -220,11 +219,11 @@ public class RequestResponseTest {
         assertEquals("Throttle time must be zero", 0, 
v0Response.getThrottleTime());
         assertEquals("Throttle time must be 10", 10, 
v1Response.getThrottleTime());
         assertEquals("Throttle time must be 10", 10, 
v2Response.getThrottleTime());
-        assertEquals("Should use schema version 0", 
ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 0),
+        assertEquals("Should use schema version 0", 
ApiKeys.PRODUCE.responseSchema((short) 0),
                 v0Response.toStruct((short) 0).schema());
-        assertEquals("Should use schema version 1", 
ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 1),
+        assertEquals("Should use schema version 1", 
ApiKeys.PRODUCE.responseSchema((short) 1),
                 v1Response.toStruct((short) 1).schema());
-        assertEquals("Should use schema version 2", 
ProtoUtils.responseSchema(ApiKeys.PRODUCE.id, 2),
+        assertEquals("Should use schema version 2", 
ApiKeys.PRODUCE.responseSchema((short) 2),
                 v2Response.toStruct((short) 2).schema());
         assertEquals("Response data does not match", responseData, 
v0Response.responses());
         assertEquals("Response data does not match", responseData, 
v1Response.responses());
@@ -242,9 +241,9 @@ public class RequestResponseTest {
         FetchResponse v1Response = new FetchResponse(responseData, 10);
         assertEquals("Throttle time must be zero", 0, 
v0Response.throttleTimeMs());
         assertEquals("Throttle time must be 10", 10, 
v1Response.throttleTimeMs());
-        assertEquals("Should use schema version 0", 
ProtoUtils.responseSchema(ApiKeys.FETCH.id, 0),
+        assertEquals("Should use schema version 0", 
ApiKeys.FETCH.responseSchema((short) 0),
                 v0Response.toStruct((short) 0).schema());
-        assertEquals("Should use schema version 1", 
ProtoUtils.responseSchema(ApiKeys.FETCH.id, 1),
+        assertEquals("Should use schema version 1", 
ApiKeys.FETCH.responseSchema((short) 1),
                 v1Response.toStruct((short) 1).schema());
         assertEquals("Response data does not match", responseData, 
v0Response.responseData());
         assertEquals("Response data does not match", responseData, 
v1Response.responseData());
@@ -253,7 +252,7 @@ public class RequestResponseTest {
     @Test
     public void verifyFetchResponseFullWrite() throws Exception {
         FetchResponse fetchResponse = createFetchResponse();
-        RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, 
ProtoUtils.latestVersion(ApiKeys.FETCH.id),
+        RequestHeader header = new RequestHeader(ApiKeys.FETCH.id, 
ApiKeys.FETCH.latestVersion(),
                 "client", 15);
 
         Send send = fetchResponse.toSend("1", header);
@@ -272,7 +271,7 @@ public class RequestResponseTest {
         assertEquals(header.correlationId(), responseHeader.correlationId());
 
         // read the body
-        Struct responseBody = ProtoUtils.responseSchema(ApiKeys.FETCH.id, 
header.apiVersion()).read(buf);
+        Struct responseBody = 
ApiKeys.FETCH.responseSchema(header.apiVersion()).read(buf);
         assertEquals(fetchResponse.toStruct(header.apiVersion()), 
responseBody);
 
         assertEquals(size, responseHeader.sizeOf() + responseBody.sizeOf());
@@ -281,7 +280,7 @@ public class RequestResponseTest {
     @Test
     public void testControlledShutdownResponse() {
         ControlledShutdownResponse response = 
createControlledShutdownResponse();
-        short version = 
ProtoUtils.latestVersion(ApiKeys.CONTROLLED_SHUTDOWN_KEY.id);
+        short version = ApiKeys.CONTROLLED_SHUTDOWN_KEY.latestVersion();
         Struct struct = response.toStruct(version);
         ByteBuffer buffer = toBuffer(struct);
         ControlledShutdownResponse deserialized = 
ControlledShutdownResponse.parse(buffer, version);
@@ -403,11 +402,11 @@ public class RequestResponseTest {
             Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = 
Collections.singletonMap(
                     new TopicPartition("test", 0),
                     new ListOffsetRequest.PartitionData(1000000L, 10));
-            return ListOffsetRequest.Builder.forConsumer((short) 
0).setOffsetData(offsetData).build((short) version);
+            return 
ListOffsetRequest.Builder.forConsumer(false).setOffsetData(offsetData).build((short)
 version);
         } else if (version == 1) {
             Map<TopicPartition, Long> offsetData = Collections.singletonMap(
                     new TopicPartition("test", 0), 1000000L);
-            return ListOffsetRequest.Builder.forConsumer((short) 
1).setTargetTimes(offsetData).build((short) version);
+            return 
ListOffsetRequest.Builder.forConsumer(true).setTargetTimes(offsetData).build((short)
 version);
         } else {
             throw new IllegalArgumentException("Illegal ListOffsetRequest 
version " + version);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index 3a9e0ce..8a93532 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.ProtoUtils;
 import org.apache.kafka.common.protocol.SecurityProtocol;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
@@ -758,8 +757,8 @@ public class SaslAuthenticatorTest {
 
         // Send ApiVersionsRequest and check response
         ApiVersionsResponse versionsResponse = 
sendVersionRequestReceiveResponse(node);
-        assertEquals(ProtoUtils.oldestVersion(ApiKeys.SASL_HANDSHAKE.id), 
versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion);
-        assertEquals(ProtoUtils.latestVersion(ApiKeys.SASL_HANDSHAKE.id), 
versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion);
+        assertEquals(ApiKeys.SASL_HANDSHAKE.oldestVersion(), 
versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).minVersion);
+        assertEquals(ApiKeys.SASL_HANDSHAKE.latestVersion(), 
versionsResponse.apiVersion(ApiKeys.SASL_HANDSHAKE.id).maxVersion);
 
         // Send SaslHandshakeRequest and check response
         SaslHandshakeResponse handshakeResponse = 
sendHandshakeRequestReceiveResponse(node);

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 4f71258..3368c09 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -27,7 +27,7 @@ import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.errors._
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils, 
SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests._
 import CreateTopicsRequest.TopicDetails
 import org.apache.kafka.common.security.auth.KafkaPrincipal
@@ -191,12 +191,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   private def createFetchRequest = {
     val partitionMap = new util.LinkedHashMap[TopicPartition, 
requests.FetchRequest.PartitionData]
     partitionMap.put(tp, new requests.FetchRequest.PartitionData(0, 100))
-    val version = ProtoUtils.latestVersion(ApiKeys.FETCH.id)
+    val version = ApiKeys.FETCH.latestVersion
     requests.FetchRequest.Builder.forReplica(version, 5000, 100, Int.MaxValue, 
partitionMap).build()
   }
 
   private def createListOffsetsRequest = {
-    requests.ListOffsetRequest.Builder.forConsumer(0).setTargetTimes(
+    requests.ListOffsetRequest.Builder.forConsumer(false).setTargetTimes(
       Map(tp -> (0L: java.lang.Long)).asJava).
       build()
   }
@@ -215,7 +215,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     val brokers = Set(new requests.UpdateMetadataRequest.Broker(brokerId,
       Seq(new requests.UpdateMetadataRequest.EndPoint("localhost", 0, 
securityProtocol,
         ListenerName.forSecurityProtocol(securityProtocol))).asJava, 
null)).asJava
-    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     new requests.UpdateMetadataRequest.Builder(version, brokerId, 
Int.MaxValue, partitionState, brokers).build()
   }
 
@@ -773,7 +773,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   @Test
   def testUnauthorizedDeleteWithoutDescribe() {
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
-    val version = ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id)
+    val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
     assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
deleteResponse.errors.asScala.head._2)
   }
@@ -782,7 +782,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testUnauthorizedDeleteWithDescribe() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Describe)), deleteTopicResource)
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
-    val version = ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id)
+    val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
 
     assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, 
deleteResponse.errors.asScala.head._2)
@@ -792,7 +792,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   def testDeleteWithWildCardAuth() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, 
Acl.WildCardHost, Delete)), new Resource(Topic, "*"))
     val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS)
-    val version = ProtoUtils.latestVersion(ApiKeys.DELETE_TOPICS.id)
+    val version = ApiKeys.DELETE_TOPICS.latestVersion
     val deleteResponse = DeleteTopicsResponse.parse(response, version)
 
     assertEquals(Errors.NONE, deleteResponse.errors.asScala.head._2)

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 448fce1..b82ddf9 100644
--- 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 import kafka.network.SocketServer
 import kafka.utils.TestUtils
 import org.apache.kafka.common.protocol.types.Struct
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.{CreateTopicsRequest, 
CreateTopicsResponse, MetadataRequest, MetadataResponse}
 import org.junit.Assert.{assertEquals, assertFalse, assertNotNull, assertTrue}
 
@@ -147,9 +147,8 @@ class AbstractCreateTopicsRequestTest extends 
BaseRequestTest {
   }
 
   protected def sendMetadataRequest(request: MetadataRequest, destination: 
SocketServer = anySocketServer): MetadataResponse = {
-    val version = ProtoUtils.latestVersion(ApiKeys.METADATA.id)
     val response = connectAndSend(request, ApiKeys.METADATA, destination = 
destination)
-    MetadataResponse.parse(response, version)
+    MetadataResponse.parse(response, ApiKeys.METADATA.latestVersion)
   }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala 
b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
index 981ed6a..2555a91 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsTest.scala
@@ -18,7 +18,7 @@
 package kafka.server
 
 import org.apache.kafka.common.requests.ApiVersionsResponse
-import org.apache.kafka.common.protocol.{ApiKeys, ProtoUtils, Protocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Protocol}
 import org.junit.Assert._
 import org.junit.Test
 
@@ -32,8 +32,8 @@ class ApiVersionsTest {
     for (key <- ApiKeys.values) {
       val version = 
ApiVersionsResponse.API_VERSIONS_RESPONSE.apiVersion(key.id)
       assertNotNull(s"Could not find ApiVersion for API ${key.name}", version)
-      assertEquals(s"Incorrect min version for Api ${key.name}.", 
version.minVersion, ProtoUtils.oldestVersion(key.id))
-      assertEquals(s"Incorrect max version for Api ${key.name}.", 
version.maxVersion, ProtoUtils.latestVersion(key.id))
+      assertEquals(s"Incorrect min version for Api ${key.name}.", 
version.minVersion, key.oldestVersion)
+      assertEquals(s"Incorrect max version for Api ${key.name}.", 
version.maxVersion, key.latestVersion)
 
       // Check if versions less than min version are indeed set as null, i.e., 
deprecated.
       for (i <- 0 until version.minVersion) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 64be5b3..5616956 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -24,7 +24,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.LogEntry
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.serialization.StringSerializer
@@ -70,7 +70,7 @@ class FetchRequestTest extends BaseRequestTest {
 
   private def sendFetchRequest(leaderId: Int, request: FetchRequest): 
FetchResponse = {
     val response = connectAndSend(request, ApiKeys.FETCH, destination = 
brokerSocketServer(leaderId))
-    FetchResponse.parse(response, ProtoUtils.latestVersion(ApiKeys.FETCH.id))
+    FetchResponse.parse(response, ApiKeys.FETCH.latestVersion)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/015f1d73/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 1b5007d..57c1846 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -22,7 +22,7 @@ import util.Arrays.asList
 import kafka.common.BrokerEndPointNotAvailableException
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils, 
SecurityProtocol}
+import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{PartitionState, UpdateMetadataRequest}
 import org.apache.kafka.common.requests.UpdateMetadataRequest.{Broker, 
EndPoint}
 import org.junit.Test
@@ -69,7 +69,7 @@ class MetadataCacheTest {
       new TopicPartition(topic, 1) -> new PartitionState(controllerEpoch, 1, 
1, asList(1), zkVersion, asSet(1)),
       new TopicPartition(topic, 2) -> new PartitionState(controllerEpoch, 2, 
2, asList(2), zkVersion, asSet(2)))
 
-    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
@@ -121,7 +121,7 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, asList(0), zkVersion, asSet(0)))
 
-    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
@@ -165,7 +165,7 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, zkVersion, replicas))
 
-    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
@@ -225,7 +225,7 @@ class MetadataCacheTest {
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, zkVersion, replicas))
 
-    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch,
       partitionStates.asJava, brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
@@ -277,7 +277,7 @@ class MetadataCacheTest {
     val isr = asList[Integer](0, 1)
     val partitionStates = Map(
       new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, 3, replicas))
-    val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+    val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
     val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, partitionStates.asJava,
       brokers.asJava).build()
     cache.updateCache(15, updateMetadataRequest)
@@ -310,7 +310,7 @@ class MetadataCacheTest {
       val isr = asList[Integer](0, 1)
       val partitionStates = Map(
         new TopicPartition(topic, 0) -> new PartitionState(controllerEpoch, 
leader, leaderEpoch, isr, 3, replicas))
-      val version = ProtoUtils.latestVersion(ApiKeys.UPDATE_METADATA_KEY.id)
+      val version = ApiKeys.UPDATE_METADATA_KEY.latestVersion
       val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
2, controllerEpoch, partitionStates.asJava,
         brokers.asJava).build()
       cache.updateCache(15, updateMetadataRequest)

Reply via email to