MINOR: Move request/response schemas to the corresponding object representation

This refactor achieves the following:

1. Breaks up the increasingly unmanageable `Protocol` class and moves schemas 
closer to their actual usage.
2. Removes the need for redundant field identifiers maintained separately in 
`Protocol` and the respective request/response objects.
3. Provides a better mechanism for sharing common fields between different 
schemas (e.g. topics, partitions, error codes, etc.).
4. Adds convenience helpers to `Struct` for common patterns (such as setting a 
field only if it exists).

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3813 from hachikuji/protocol-schema-refactor


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0cf77080
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0cf77080
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0cf77080

Branch: refs/heads/trunk
Commit: 0cf7708007b01faac5012d939f3c50db274f858d
Parents: a64fe2e
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Sep 19 05:12:55 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Tue Sep 19 05:12:55 2017 +0100

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |    1 +
 .../org/apache/kafka/clients/NetworkClient.java |    5 +-
 .../kafka/clients/consumer/StickyAssignor.java  |   22 +-
 .../apache/kafka/common/protocol/ApiKeys.java   |  249 ++-
 .../kafka/common/protocol/CommonFields.java     |   40 +
 .../kafka/common/protocol/ProtoUtils.java       |    6 +-
 .../apache/kafka/common/protocol/Protocol.java  | 2095 +-----------------
 .../kafka/common/protocol/types/BoundField.java |   37 +
 .../kafka/common/protocol/types/Field.java      |   72 +-
 .../kafka/common/protocol/types/Schema.java     |   46 +-
 .../kafka/common/protocol/types/Struct.java     |  129 +-
 .../kafka/common/requests/AbstractRequest.java  |    4 +-
 .../kafka/common/requests/AbstractResponse.java |    5 +-
 .../common/requests/AddOffsetsToTxnRequest.java |   16 +
 .../requests/AddOffsetsToTxnResponse.java       |   20 +-
 .../requests/AddPartitionsToTxnRequest.java     |   35 +-
 .../requests/AddPartitionsToTxnResponse.java    |   37 +-
 .../common/requests/AlterConfigsRequest.java    |   26 +
 .../common/requests/AlterConfigsResponse.java   |   27 +-
 .../common/requests/AlterReplicaDirRequest.java |   23 +-
 .../requests/AlterReplicaDirResponse.java       |   40 +-
 .../apache/kafka/common/requests/ApiError.java  |   20 +-
 .../common/requests/ApiVersionsRequest.java     |   10 +
 .../common/requests/ApiVersionsResponse.java    |   72 +-
 .../requests/ControlledShutdownRequest.java     |   20 +-
 .../requests/ControlledShutdownResponse.java    |   41 +-
 .../common/requests/CreateAclsRequest.java      |   31 +-
 .../common/requests/CreateAclsResponse.java     |   29 +-
 .../common/requests/CreateTopicsRequest.java    |   76 +-
 .../common/requests/CreateTopicsResponse.java   |   42 +-
 .../common/requests/DeleteAclsRequest.java      |   22 +
 .../common/requests/DeleteAclsResponse.java     |   55 +-
 .../common/requests/DeleteRecordsRequest.java   |   36 +-
 .../common/requests/DeleteRecordsResponse.java  |   47 +-
 .../common/requests/DeleteTopicsRequest.java    |   19 +
 .../common/requests/DeleteTopicsResponse.java   |   39 +-
 .../common/requests/DescribeAclsRequest.java    |   20 +
 .../common/requests/DescribeAclsResponse.java   |   52 +-
 .../common/requests/DescribeConfigsRequest.java |   18 +
 .../requests/DescribeConfigsResponse.java       |   65 +-
 .../common/requests/DescribeGroupsRequest.java  |   17 +
 .../common/requests/DescribeGroupsResponse.java |   48 +-
 .../common/requests/DescribeLogDirsRequest.java |   20 +-
 .../requests/DescribeLogDirsResponse.java       |   52 +-
 .../kafka/common/requests/EndTxnRequest.java    |   17 +
 .../kafka/common/requests/EndTxnResponse.java   |   20 +-
 .../kafka/common/requests/FetchRequest.java     |  108 +-
 .../kafka/common/requests/FetchResponse.java    |  132 +-
 .../common/requests/FindCoordinatorRequest.java |   17 +
 .../requests/FindCoordinatorResponse.java       |   57 +-
 .../kafka/common/requests/HeartbeatRequest.java |   17 +
 .../common/requests/HeartbeatResponse.java      |   23 +-
 .../common/requests/InitProducerIdRequest.java  |   13 +
 .../common/requests/InitProducerIdResponse.java |   29 +-
 .../kafka/common/requests/JoinGroupRequest.java |   39 +
 .../common/requests/JoinGroupResponse.java      |   70 +-
 .../common/requests/LeaderAndIsrRequest.java    |   64 +-
 .../common/requests/LeaderAndIsrResponse.java   |   42 +-
 .../common/requests/LeaveGroupRequest.java      |   18 +-
 .../common/requests/LeaveGroupResponse.java     |   23 +-
 .../common/requests/ListGroupsRequest.java      |   14 +-
 .../common/requests/ListGroupsResponse.java     |   47 +-
 .../common/requests/ListOffsetRequest.java      |   58 +-
 .../common/requests/ListOffsetResponse.java     |   65 +-
 .../kafka/common/requests/MetadataRequest.java  |   34 +
 .../kafka/common/requests/MetadataResponse.java |  126 +-
 .../common/requests/OffsetCommitRequest.java    |   73 +-
 .../common/requests/OffsetCommitResponse.java   |   56 +-
 .../common/requests/OffsetFetchRequest.java     |   54 +-
 .../common/requests/OffsetFetchResponse.java    |   79 +-
 .../requests/OffsetsForLeaderEpochRequest.java  |   47 +-
 .../requests/OffsetsForLeaderEpochResponse.java |   56 +-
 .../kafka/common/requests/ProduceRequest.java   |   71 +-
 .../kafka/common/requests/ProduceResponse.java  |   80 +-
 .../kafka/common/requests/RequestContext.java   |    3 +-
 .../kafka/common/requests/RequestHeader.java    |   31 +-
 .../kafka/common/requests/RequestUtils.java     |   59 +-
 .../kafka/common/requests/ResponseHeader.java   |   19 +-
 .../requests/SaslAuthenticateRequest.java       |   15 +-
 .../requests/SaslAuthenticateResponse.java      |   30 +-
 .../common/requests/SaslHandshakeRequest.java   |   22 +-
 .../common/requests/SaslHandshakeResponse.java  |   28 +-
 .../common/requests/StopReplicaRequest.java     |   31 +-
 .../common/requests/StopReplicaResponse.java    |   40 +-
 .../kafka/common/requests/SyncGroupRequest.java |   33 +-
 .../common/requests/SyncGroupResponse.java      |   29 +-
 .../common/requests/TxnOffsetCommitRequest.java |   47 +-
 .../requests/TxnOffsetCommitResponse.java       |   50 +-
 .../common/requests/UpdateMetadataRequest.java  |  174 +-
 .../common/requests/UpdateMetadataResponse.java |   21 +-
 .../common/requests/WriteTxnMarkersRequest.java |   40 +-
 .../requests/WriteTxnMarkersResponse.java       |   49 +-
 .../authenticator/SaslServerAuthenticator.java  |    5 +-
 .../apache/kafka/clients/NetworkClientTest.java |    5 +-
 .../kafka/clients/NodeApiVersionsTest.java      |    2 +-
 .../kafka/common/protocol/ApiKeysTest.java      |   13 +-
 .../kafka/common/protocol/ProtoUtilsTest.java   |    9 +-
 .../types/ProtocolSerializationTest.java        |   20 +-
 .../requests/ApiVersionsResponseTest.java       |    2 +-
 .../common/requests/RequestHeaderTest.java      |    6 +-
 .../common/requests/RequestResponseTest.java    |    2 +-
 .../authenticator/SaslAuthenticatorTest.java    |    7 +-
 .../controller/ControllerChannelManager.scala   |    4 +-
 .../group/GroupMetadataManager.scala            |    2 +-
 .../transaction/TransactionLog.scala            |    2 +-
 .../scala/kafka/log/ProducerStateManager.scala  |    2 +-
 .../scala/kafka/network/RequestChannel.scala    |    4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |    6 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |   18 +-
 .../kafka/server/ApiVersionsRequestTest.scala   |    2 +-
 .../unit/kafka/server/ApiVersionsTest.scala     |   14 +-
 .../unit/kafka/server/MetadataCacheTest.scala   |   12 +-
 .../unit/kafka/server/RequestQuotaTest.scala    |    6 +-
 113 files changed, 3243 insertions(+), 2956 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index bea7e20..3329b2d 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -105,6 +105,7 @@
       <allow pkg="org.apache.kafka.common.errors" />
       <allow pkg="org.apache.kafka.common.protocol.types" />
       <allow pkg="org.apache.kafka.common.record" />
+      <allow pkg="org.apache.kafka.common.requests" />
     </subpackage>
 
     <subpackage name="record">

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index f046696..c3c15df 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.network.Send;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.CommonFields;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
@@ -570,8 +571,8 @@ public class NetworkClient implements KafkaClient {
         // Always expect the response version id to be the same as the request 
version id
         Struct responseBody = 
requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), 
responseBuffer);
         correlate(requestHeader, responseHeader);
-        if (throttleTimeSensor != null && 
responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME))
-            
throttleTimeSensor.record(responseBody.getInt(AbstractResponse.THROTTLE_TIME_KEY_NAME),
 now);
+        if (throttleTimeSensor != null && 
responseBody.hasField(CommonFields.THROTTLE_TIME_MS))
+            
throttleTimeSensor.record(responseBody.get(CommonFields.THROTTLE_TIME_MS), now);
         return responseBody;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
index d7bfaf1..247b619 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java
@@ -16,6 +16,17 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+import org.apache.kafka.common.utils.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -31,17 +42,6 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeSet;
 
-import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.protocol.types.Type;
-import org.apache.kafka.common.utils.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  * <p>The sticky assignor serves two purposes. First, it guarantees an 
assignment that is as balanced as possible, meaning either:
  * <ul>

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index d37eddf..0e087eb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -19,33 +19,117 @@ package org.apache.kafka.common.protocol;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.SchemaException;
 import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
 import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.requests.AddOffsetsToTxnRequest;
+import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
+import org.apache.kafka.common.requests.AddPartitionsToTxnRequest;
+import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
+import org.apache.kafka.common.requests.AlterConfigsRequest;
+import org.apache.kafka.common.requests.AlterConfigsResponse;
+import org.apache.kafka.common.requests.AlterReplicaDirRequest;
+import org.apache.kafka.common.requests.AlterReplicaDirResponse;
+import org.apache.kafka.common.requests.ApiVersionsRequest;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
+import org.apache.kafka.common.requests.ControlledShutdownRequest;
+import org.apache.kafka.common.requests.ControlledShutdownResponse;
+import org.apache.kafka.common.requests.CreateAclsRequest;
+import org.apache.kafka.common.requests.CreateAclsResponse;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteAclsRequest;
+import org.apache.kafka.common.requests.DeleteAclsResponse;
+import org.apache.kafka.common.requests.DeleteRecordsRequest;
+import org.apache.kafka.common.requests.DeleteRecordsResponse;
+import org.apache.kafka.common.requests.DeleteTopicsRequest;
+import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.requests.DescribeAclsRequest;
+import org.apache.kafka.common.requests.DescribeAclsResponse;
+import org.apache.kafka.common.requests.DescribeConfigsRequest;
+import org.apache.kafka.common.requests.DescribeConfigsResponse;
+import org.apache.kafka.common.requests.DescribeGroupsRequest;
+import org.apache.kafka.common.requests.DescribeGroupsResponse;
+import org.apache.kafka.common.requests.DescribeLogDirsRequest;
+import org.apache.kafka.common.requests.DescribeLogDirsResponse;
+import org.apache.kafka.common.requests.EndTxnRequest;
+import org.apache.kafka.common.requests.EndTxnResponse;
+import org.apache.kafka.common.requests.FetchRequest;
+import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.FindCoordinatorRequest;
+import org.apache.kafka.common.requests.FindCoordinatorResponse;
+import org.apache.kafka.common.requests.HeartbeatRequest;
+import org.apache.kafka.common.requests.HeartbeatResponse;
+import org.apache.kafka.common.requests.InitProducerIdRequest;
+import org.apache.kafka.common.requests.InitProducerIdResponse;
+import org.apache.kafka.common.requests.JoinGroupRequest;
+import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.LeaderAndIsrRequest;
+import org.apache.kafka.common.requests.LeaderAndIsrResponse;
+import org.apache.kafka.common.requests.LeaveGroupRequest;
+import org.apache.kafka.common.requests.LeaveGroupResponse;
+import org.apache.kafka.common.requests.ListGroupsRequest;
+import org.apache.kafka.common.requests.ListGroupsResponse;
+import org.apache.kafka.common.requests.ListOffsetRequest;
+import org.apache.kafka.common.requests.ListOffsetResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetCommitRequest;
+import org.apache.kafka.common.requests.OffsetCommitResponse;
+import org.apache.kafka.common.requests.OffsetFetchRequest;
+import org.apache.kafka.common.requests.OffsetFetchResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
+import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.SaslAuthenticateRequest;
+import org.apache.kafka.common.requests.SaslAuthenticateResponse;
+import org.apache.kafka.common.requests.SaslHandshakeRequest;
+import org.apache.kafka.common.requests.SaslHandshakeResponse;
+import org.apache.kafka.common.requests.StopReplicaRequest;
+import org.apache.kafka.common.requests.StopReplicaResponse;
+import org.apache.kafka.common.requests.SyncGroupRequest;
+import org.apache.kafka.common.requests.SyncGroupResponse;
+import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
+import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
+import org.apache.kafka.common.requests.UpdateMetadataRequest;
+import org.apache.kafka.common.requests.UpdateMetadataResponse;
+import org.apache.kafka.common.requests.WriteTxnMarkersRequest;
+import org.apache.kafka.common.requests.WriteTxnMarkersResponse;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.apache.kafka.common.protocol.types.Type.BYTES;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
+import static org.apache.kafka.common.protocol.types.Type.RECORDS;
 
 /**
  * Identifiers for all the Kafka APIs
  */
 public enum ApiKeys {
-    PRODUCE(0, "Produce"),
-    FETCH(1, "Fetch"),
-    LIST_OFFSETS(2, "Offsets"),
-    METADATA(3, "Metadata"),
-    LEADER_AND_ISR(4, "LeaderAndIsr", true),
-    STOP_REPLICA(5, "StopReplica", true),
-    UPDATE_METADATA_KEY(6, "UpdateMetadata", true),
-    CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown", true),
-    OFFSET_COMMIT(8, "OffsetCommit"),
-    OFFSET_FETCH(9, "OffsetFetch"),
-    FIND_COORDINATOR(10, "FindCoordinator"),
-    JOIN_GROUP(11, "JoinGroup"),
-    HEARTBEAT(12, "Heartbeat"),
-    LEAVE_GROUP(13, "LeaveGroup"),
-    SYNC_GROUP(14, "SyncGroup"),
-    DESCRIBE_GROUPS(15, "DescribeGroups"),
-    LIST_GROUPS(16, "ListGroups"),
-    SASL_HANDSHAKE(17, "SaslHandshake"),
-    API_VERSIONS(18, "ApiVersions") {
+    PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), 
ProduceResponse.schemaVersions()),
+    FETCH(1, "Fetch", FetchRequest.schemaVersions(), 
FetchResponse.schemaVersions()),
+    LIST_OFFSETS(2, "ListOffsets", ListOffsetRequest.schemaVersions(), 
ListOffsetResponse.schemaVersions()),
+    METADATA(3, "Metadata", MetadataRequest.schemaVersions(), 
MetadataResponse.schemaVersions()),
+    LEADER_AND_ISR(4, "LeaderAndIsr", true, 
LeaderAndIsrRequest.schemaVersions(), LeaderAndIsrResponse.schemaVersions()),
+    STOP_REPLICA(5, "StopReplica", true, StopReplicaRequest.schemaVersions(), 
StopReplicaResponse.schemaVersions()),
+    UPDATE_METADATA(6, "UpdateMetadata", true, 
UpdateMetadataRequest.schemaVersions(),
+            UpdateMetadataResponse.schemaVersions()),
+    CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, 
ControlledShutdownRequest.schemaVersions(),
+            ControlledShutdownResponse.schemaVersions()),
+    OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequest.schemaVersions(), 
OffsetCommitResponse.schemaVersions()),
+    OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), 
OffsetFetchResponse.schemaVersions()),
+    FIND_COORDINATOR(10, "FindCoordinator", 
FindCoordinatorRequest.schemaVersions(),
+            FindCoordinatorResponse.schemaVersions()),
+    JOIN_GROUP(11, "JoinGroup", JoinGroupRequest.schemaVersions(), 
JoinGroupResponse.schemaVersions()),
+    HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), 
HeartbeatResponse.schemaVersions()),
+    LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequest.schemaVersions(), 
LeaveGroupResponse.schemaVersions()),
+    SYNC_GROUP(14, "SyncGroup", SyncGroupRequest.schemaVersions(), 
SyncGroupResponse.schemaVersions()),
+    DESCRIBE_GROUPS(15, "DescribeGroups", 
DescribeGroupsRequest.schemaVersions(),
+            DescribeGroupsResponse.schemaVersions()),
+    LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), 
ListGroupsResponse.schemaVersions()),
+    SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequest.schemaVersions(), 
SaslHandshakeResponse.schemaVersions()),
+    API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), 
ApiVersionsResponse.schemaVersions()) {
         @Override
         public Struct parseResponse(short version, ByteBuffer buffer) {
             // Fallback to version 0 for ApiVersions response. If a client 
sends an ApiVersionsRequest
@@ -54,24 +138,37 @@ public enum ApiKeys {
             return parseResponse(version, buffer, (short) 0);
         }
     },
-    CREATE_TOPICS(19, "CreateTopics"),
-    DELETE_TOPICS(20, "DeleteTopics"),
-    DELETE_RECORDS(21, "DeleteRecords"),
-    INIT_PRODUCER_ID(22, "InitProducerId"),
-    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true),
-    ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, 
RecordBatch.MAGIC_VALUE_V2),
-    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, 
RecordBatch.MAGIC_VALUE_V2),
-    END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2),
-    WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2),
-    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, 
RecordBatch.MAGIC_VALUE_V2),
-    DESCRIBE_ACLS(29, "DescribeAcls"),
-    CREATE_ACLS(30, "CreateAcls"),
-    DELETE_ACLS(31, "DeleteAcls"),
-    DESCRIBE_CONFIGS(32, "DescribeConfigs"),
-    ALTER_CONFIGS(33, "AlterConfigs"),
-    ALTER_REPLICA_DIR(34, "AlterReplicaDir"),
-    DESCRIBE_LOG_DIRS(35, "DescribeLogDirs"),
-    SASL_AUTHENTICATE(36, "SaslAuthenticate");
+    CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequest.schemaVersions(), 
CreateTopicsResponse.schemaVersions()),
+    DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequest.schemaVersions(), 
DeleteTopicsResponse.schemaVersions()),
+    DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), 
DeleteRecordsResponse.schemaVersions()),
+    INIT_PRODUCER_ID(22, "InitProducerId", 
InitProducerIdRequest.schemaVersions(),
+            InitProducerIdResponse.schemaVersions()),
+    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true, 
OffsetsForLeaderEpochRequest.schemaVersions(),
+            OffsetsForLeaderEpochResponse.schemaVersions()),
+    ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, 
RecordBatch.MAGIC_VALUE_V2,
+            AddPartitionsToTxnRequest.schemaVersions(), 
AddPartitionsToTxnResponse.schemaVersions()),
+    ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, 
RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequest.schemaVersions(),
+            AddOffsetsToTxnResponse.schemaVersions()),
+    END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, 
EndTxnRequest.schemaVersions(),
+            EndTxnResponse.schemaVersions()),
+    WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, 
WriteTxnMarkersRequest.schemaVersions(),
+            WriteTxnMarkersResponse.schemaVersions()),
+    TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, 
RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequest.schemaVersions(),
+            TxnOffsetCommitResponse.schemaVersions()),
+    DESCRIBE_ACLS(29, "DescribeAcls", DescribeAclsRequest.schemaVersions(), 
DescribeAclsResponse.schemaVersions()),
+    CREATE_ACLS(30, "CreateAcls", CreateAclsRequest.schemaVersions(), 
CreateAclsResponse.schemaVersions()),
+    DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequest.schemaVersions(), 
DeleteAclsResponse.schemaVersions()),
+    DESCRIBE_CONFIGS(32, "DescribeConfigs", 
DescribeConfigsRequest.schemaVersions(),
+            DescribeConfigsResponse.schemaVersions()),
+    ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequest.schemaVersions(),
+            AlterConfigsResponse.schemaVersions()),
+    ALTER_REPLICA_DIR(34, "AlterReplicaDir", 
AlterReplicaDirRequest.schemaVersions(),
+            AlterReplicaDirResponse.schemaVersions()),
+    DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", 
DescribeLogDirsRequest.schemaVersions(),
+            DescribeLogDirsResponse.schemaVersions()),
+    SASL_AUTHENTICATE(36, "SaslAuthenticate", 
SaslAuthenticateRequest.schemaVersions(),
+            SaslAuthenticateResponse.schemaVersions());
+
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;
@@ -100,21 +197,48 @@ public enum ApiKeys {
     /** indicates the minimum required inter broker magic required to support 
the API */
     public final byte minRequiredInterBrokerMagic;
 
-    ApiKeys(int id, String name) {
-        this(id, name, false);
+    public final Schema[] requestSchemas;
+    public final Schema[] responseSchemas;
+    public final boolean requiresDelayedAllocation;
+
+    ApiKeys(int id, String name, Schema[] requestSchemas, Schema[] 
responseSchemas) {
+        this(id, name, false, requestSchemas, responseSchemas);
     }
 
-    ApiKeys(int id, String name, boolean clusterAction) {
-        this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0);
+    ApiKeys(int id, String name, boolean clusterAction, Schema[] 
requestSchemas, Schema[] responseSchemas) {
+        this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, 
requestSchemas, responseSchemas);
     }
 
-    ApiKeys(int id, String name, boolean clusterAction, byte 
minRequiredInterBrokerMagic) {
+    ApiKeys(int id, String name, boolean clusterAction, byte 
minRequiredInterBrokerMagic,
+            Schema[] requestSchemas, Schema[] responseSchemas) {
         if (id < 0)
             throw new IllegalArgumentException("id must not be negative, id: " 
+ id);
         this.id = (short) id;
         this.name = name;
         this.clusterAction = clusterAction;
         this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic;
+
+        if (requestSchemas.length != responseSchemas.length)
+            throw new IllegalStateException(requestSchemas.length + " request 
versions for api " + name
+                    + " but " + responseSchemas.length + " response 
versions.");
+
+        for (int i = 0; i < requestSchemas.length; ++i) {
+            if (requestSchemas[i] == null)
+                throw new IllegalStateException("Request schema for api " + 
name + " for version " + i + " is null");
+            if (responseSchemas[i] == null)
+                throw new IllegalStateException("Response schema for api " + 
name + " for version " + i + " is null");
+        }
+
+        boolean requestRetainsBufferReference = false;
+        for (Schema requestVersionSchema : requestSchemas) {
+            if (retainsBufferReference(requestVersionSchema)) {
+                requestRetainsBufferReference = true;
+                break;
+            }
+        }
+        this.requiresDelayedAllocation = requestRetainsBufferReference;
+        this.requestSchemas = requestSchemas;
+        this.responseSchemas = responseSchemas;
     }
 
     public static ApiKeys forId(int id) {
@@ -129,23 +253,19 @@ public enum ApiKeys {
     }
 
     public short latestVersion() {
-        if (id >= Protocol.CURR_VERSION.length)
-            throw new IllegalArgumentException("Latest version for API key " + 
this + " is not defined");
-        return Protocol.CURR_VERSION[id];
+        return (short) (requestSchemas.length - 1);
     }
 
     public short oldestVersion() {
-        if (id >= Protocol.MIN_VERSIONS.length)
-            throw new IllegalArgumentException("Oldest version for API key " + 
this + " is not defined");
-        return Protocol.MIN_VERSIONS[id];
+        return 0;
     }
 
     public Schema requestSchema(short version) {
-        return schemaFor(Protocol.REQUESTS, version);
+        return schemaFor(requestSchemas, version);
     }
 
     public Schema responseSchema(short version) {
-        return schemaFor(Protocol.RESPONSES, version);
+        return schemaFor(responseSchemas, version);
     }
 
     public Struct parseRequest(short version, ByteBuffer buffer) {
@@ -169,18 +289,16 @@ public enum ApiKeys {
         }
     }
 
-    private Schema schemaFor(Schema[][] schemas, short version) {
-        if (id > schemas.length)
-            throw new IllegalArgumentException("No schema available for API 
key " + this);
-        if (version < 0 || version > latestVersion())
+    private Schema schemaFor(Schema[] versions, short version) {
+        if (!isVersionSupported(version))
             throw new IllegalArgumentException("Invalid version for API key " 
+ this + ": " + version);
-
-        Schema[] versions = schemas[id];
-        if (versions[version] == null)
-            throw new IllegalArgumentException("Unsupported version for API 
key " + this + ": " + version);
         return versions[version];
     }
 
+    public boolean isVersionSupported(short apiVersion) {
+        return apiVersion >= oldestVersion() && apiVersion <= latestVersion();
+    }
+
     private static String toHtml() {
         final StringBuilder b = new StringBuilder();
         b.append("<table class=\"data-table\"><tbody>\n");
@@ -206,4 +324,19 @@ public enum ApiKeys {
         System.out.println(toHtml());
     }
 
+    private static boolean retainsBufferReference(Schema schema) {
+        final AtomicReference<Boolean> foundBufferReference = new 
AtomicReference<>(Boolean.FALSE);
+        SchemaVisitor detector = new SchemaVisitorAdapter() {
+            @Override
+            public void visit(Type field) {
+                if (field == BYTES || field == NULLABLE_BYTES || field == 
RECORDS) {
+                    foundBufferReference.set(Boolean.TRUE);
+                }
+            }
+        };
+        foundBufferReference.set(Boolean.FALSE);
+        ProtoUtils.walk(schema, detector);
+        return foundBufferReference.get();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
new file mode 100644
index 0000000..e12cde4
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.protocol.types.Field;
+
+public class CommonFields {
+    public static final Field.Int32 THROTTLE_TIME_MS = new 
Field.Int32("throttle_time_ms",
+            "Duration in milliseconds for which the request was throttled due 
to quota violation (Zero if the " +
+                    "request did not violate any quota)", 0);
+    public static final Field.Str TOPIC_NAME = new Field.Str("topic", "Name of 
topic");
+    public static final Field.Int32 PARTITION_ID = new 
Field.Int32("partition", "Topic partition id");
+    public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", 
"Response error code");
+    public static final Field.NullableStr ERROR_MESSAGE = new 
Field.NullableStr("error_message", "Response error message");
+
+    // ACL Apis
+    public static final Field.Int8 RESOURCE_TYPE = new 
Field.Int8("resource_type", "The resource type");
+    public static final Field.Str RESOURCE_NAME = new 
Field.Str("resource_name", "The resource name");
+    public static final Field.NullableStr RESOURCE_NAME_FILTER = new 
Field.NullableStr("resource_name", "The resource name filter");
+    public static final Field.Str PRINCIPAL = new Field.Str("principal", "The 
ACL principal");
+    public static final Field.NullableStr PRINCIPAL_FILTER = new 
Field.NullableStr("principal", "The ACL principal filter");
+    public static final Field.Str HOST = new Field.Str("host", "The ACL host");
+    public static final Field.NullableStr HOST_FILTER = new 
Field.NullableStr("host", "The ACL host filter");
+    public static final Field.Int8 OPERATION = new Field.Int8("operation", 
"The ACL operation");
+    public static final Field.Int8 PERMISSION_TYPE = new 
Field.Int8("permission_type", "The ACL permission type");
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
index 5d39dff..f9be12c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -17,7 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.BoundField;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Type;
 
@@ -33,8 +33,8 @@ public class ProtoUtils {
         if (node instanceof Schema) {
             Schema schema = (Schema) node;
             visitor.visit(schema);
-            for (Field f : schema.fields()) {
-                handleNode(f.type, visitor);
+            for (BoundField f : schema.fields()) {
+                handleNode(f.def.type, visitor);
             }
         } else if (node instanceof ArrayOf) {
             ArrayOf array = (ArrayOf) node;

Reply via email to