MINOR: Move request/response schemas to the corresponding object representation
This refactor achieves the following: 1. Breaks up the increasingly unmanageable `Protocol` class and moves schemas closer to their actual usage. 2. Removes the need for redundant field identifiers maintained separately in `Protocol` and the respective request/response objects. 3. Provides a better mechanism for sharing common fields between different schemas (e.g. topics, partitions, error codes, etc.). 4. Adds convenience helpers to `Struct` for common patterns (such as setting a field only if it exists). Author: Jason Gustafson <ja...@confluent.io> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3813 from hachikuji/protocol-schema-refactor Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0cf77080 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0cf77080 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0cf77080 Branch: refs/heads/trunk Commit: 0cf7708007b01faac5012d939f3c50db274f858d Parents: a64fe2e Author: Jason Gustafson <ja...@confluent.io> Authored: Tue Sep 19 05:12:55 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Tue Sep 19 05:12:55 2017 +0100 ---------------------------------------------------------------------- checkstyle/import-control.xml | 1 + .../org/apache/kafka/clients/NetworkClient.java | 5 +- .../kafka/clients/consumer/StickyAssignor.java | 22 +- .../apache/kafka/common/protocol/ApiKeys.java | 249 ++- .../kafka/common/protocol/CommonFields.java | 40 + .../kafka/common/protocol/ProtoUtils.java | 6 +- .../apache/kafka/common/protocol/Protocol.java | 2095 +----------------- .../kafka/common/protocol/types/BoundField.java | 37 + .../kafka/common/protocol/types/Field.java | 72 +- .../kafka/common/protocol/types/Schema.java | 46 +- .../kafka/common/protocol/types/Struct.java | 129 +- .../kafka/common/requests/AbstractRequest.java | 4 +- .../kafka/common/requests/AbstractResponse.java | 5 +- .../common/requests/AddOffsetsToTxnRequest.java | 16 + .../requests/AddOffsetsToTxnResponse.java | 20 +- .../requests/AddPartitionsToTxnRequest.java | 35 +- .../requests/AddPartitionsToTxnResponse.java | 37 +- .../common/requests/AlterConfigsRequest.java | 26 + .../common/requests/AlterConfigsResponse.java | 27 +- .../common/requests/AlterReplicaDirRequest.java | 23 +- .../requests/AlterReplicaDirResponse.java | 40 +- .../apache/kafka/common/requests/ApiError.java | 20 +- .../common/requests/ApiVersionsRequest.java | 10 + .../common/requests/ApiVersionsResponse.java | 72 +- .../requests/ControlledShutdownRequest.java | 20 +- .../requests/ControlledShutdownResponse.java | 41 +- .../common/requests/CreateAclsRequest.java | 31 +- .../common/requests/CreateAclsResponse.java | 29 +- .../common/requests/CreateTopicsRequest.java | 76 +- .../common/requests/CreateTopicsResponse.java | 42 +- .../common/requests/DeleteAclsRequest.java | 22 + .../common/requests/DeleteAclsResponse.java | 55 +- .../common/requests/DeleteRecordsRequest.java | 36 +- .../common/requests/DeleteRecordsResponse.java | 47 +- .../common/requests/DeleteTopicsRequest.java | 19 + .../common/requests/DeleteTopicsResponse.java | 39 +- .../common/requests/DescribeAclsRequest.java | 20 + .../common/requests/DescribeAclsResponse.java | 52 +- .../common/requests/DescribeConfigsRequest.java | 18 + .../requests/DescribeConfigsResponse.java | 65 +- .../common/requests/DescribeGroupsRequest.java | 17 + .../common/requests/DescribeGroupsResponse.java | 48 +- .../common/requests/DescribeLogDirsRequest.java | 20 +- .../requests/DescribeLogDirsResponse.java | 52 +- .../kafka/common/requests/EndTxnRequest.java | 17 + .../kafka/common/requests/EndTxnResponse.java | 20 +- .../kafka/common/requests/FetchRequest.java | 108 +- .../kafka/common/requests/FetchResponse.java | 132 +- .../common/requests/FindCoordinatorRequest.java | 17 + .../requests/FindCoordinatorResponse.java | 57 +- .../kafka/common/requests/HeartbeatRequest.java | 17 + .../common/requests/HeartbeatResponse.java | 23 +- .../common/requests/InitProducerIdRequest.java | 13 + .../common/requests/InitProducerIdResponse.java | 29 +- .../kafka/common/requests/JoinGroupRequest.java | 39 + .../common/requests/JoinGroupResponse.java | 70 +- .../common/requests/LeaderAndIsrRequest.java | 64 +- .../common/requests/LeaderAndIsrResponse.java | 42 +- .../common/requests/LeaveGroupRequest.java | 18 +- .../common/requests/LeaveGroupResponse.java | 23 +- .../common/requests/ListGroupsRequest.java | 14 +- .../common/requests/ListGroupsResponse.java | 47 +- .../common/requests/ListOffsetRequest.java | 58 +- .../common/requests/ListOffsetResponse.java | 65 +- .../kafka/common/requests/MetadataRequest.java | 34 + .../kafka/common/requests/MetadataResponse.java | 126 +- .../common/requests/OffsetCommitRequest.java | 73 +- .../common/requests/OffsetCommitResponse.java | 56 +- .../common/requests/OffsetFetchRequest.java | 54 +- .../common/requests/OffsetFetchResponse.java | 79 +- .../requests/OffsetsForLeaderEpochRequest.java | 47 +- .../requests/OffsetsForLeaderEpochResponse.java | 56 +- .../kafka/common/requests/ProduceRequest.java | 71 +- .../kafka/common/requests/ProduceResponse.java | 80 +- .../kafka/common/requests/RequestContext.java | 3 +- .../kafka/common/requests/RequestHeader.java | 31 +- .../kafka/common/requests/RequestUtils.java | 59 +- .../kafka/common/requests/ResponseHeader.java | 19 +- .../requests/SaslAuthenticateRequest.java | 15 +- .../requests/SaslAuthenticateResponse.java | 30 +- .../common/requests/SaslHandshakeRequest.java | 22 +- .../common/requests/SaslHandshakeResponse.java | 28 +- .../common/requests/StopReplicaRequest.java | 31 +- .../common/requests/StopReplicaResponse.java | 40 +- .../kafka/common/requests/SyncGroupRequest.java | 33 +- .../common/requests/SyncGroupResponse.java | 29 +- .../common/requests/TxnOffsetCommitRequest.java | 47 +- .../requests/TxnOffsetCommitResponse.java | 50 +- .../common/requests/UpdateMetadataRequest.java | 174 +- .../common/requests/UpdateMetadataResponse.java | 21 +- .../common/requests/WriteTxnMarkersRequest.java | 40 +- .../requests/WriteTxnMarkersResponse.java | 49 +- .../authenticator/SaslServerAuthenticator.java | 5 +- .../apache/kafka/clients/NetworkClientTest.java | 5 +- .../kafka/clients/NodeApiVersionsTest.java | 2 +- .../kafka/common/protocol/ApiKeysTest.java | 13 +- .../kafka/common/protocol/ProtoUtilsTest.java | 9 +- .../types/ProtocolSerializationTest.java | 20 +- .../requests/ApiVersionsResponseTest.java | 2 +- .../common/requests/RequestHeaderTest.java | 6 +- .../common/requests/RequestResponseTest.java | 2 +- .../authenticator/SaslAuthenticatorTest.java | 7 +- .../controller/ControllerChannelManager.scala | 4 +- .../group/GroupMetadataManager.scala | 2 +- .../transaction/TransactionLog.scala | 2 +- .../scala/kafka/log/ProducerStateManager.scala | 2 +- .../scala/kafka/network/RequestChannel.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 6 +- .../kafka/api/AuthorizerIntegrationTest.scala | 18 +- .../kafka/server/ApiVersionsRequestTest.scala | 2 +- .../unit/kafka/server/ApiVersionsTest.scala | 14 +- .../unit/kafka/server/MetadataCacheTest.scala | 12 +- .../unit/kafka/server/RequestQuotaTest.scala | 6 +- 113 files changed, 3243 insertions(+), 2956 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index bea7e20..3329b2d 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -105,6 +105,7 @@ <allow pkg="org.apache.kafka.common.errors" /> <allow pkg="org.apache.kafka.common.protocol.types" /> <allow pkg="org.apache.kafka.common.record" /> + <allow pkg="org.apache.kafka.common.requests" /> </subpackage> <subpackage name="record"> http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index f046696..c3c15df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.network.Selectable; import org.apache.kafka.common.network.Send; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.CommonFields; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; @@ -570,8 +571,8 @@ public class NetworkClient implements KafkaClient { // Always expect the response version id to be the same as the request version id Struct responseBody = requestHeader.apiKey().parseResponse(requestHeader.apiVersion(), responseBuffer); correlate(requestHeader, responseHeader); - if (throttleTimeSensor != null && responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME)) - throttleTimeSensor.record(responseBody.getInt(AbstractResponse.THROTTLE_TIME_KEY_NAME), now); + if (throttleTimeSensor != null && responseBody.hasField(CommonFields.THROTTLE_TIME_MS)) + throttleTimeSensor.record(responseBody.get(CommonFields.THROTTLE_TIME_MS), now); return responseBody; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java index d7bfaf1..247b619 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/StickyAssignor.java @@ -16,6 +16,17 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.protocol.types.ArrayOf; +import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; +import org.apache.kafka.common.utils.CollectionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.Serializable; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -31,17 +42,6 @@ import java.util.Map.Entry; import java.util.Set; import java.util.TreeSet; -import org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.protocol.types.Type; -import org.apache.kafka.common.utils.CollectionUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * <p>The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either: * <ul> http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index d37eddf..0e087eb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -19,33 +19,117 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; +import org.apache.kafka.common.protocol.types.Type; import org.apache.kafka.common.record.RecordBatch; +import org.apache.kafka.common.requests.AddOffsetsToTxnRequest; +import org.apache.kafka.common.requests.AddOffsetsToTxnResponse; +import org.apache.kafka.common.requests.AddPartitionsToTxnRequest; +import org.apache.kafka.common.requests.AddPartitionsToTxnResponse; +import org.apache.kafka.common.requests.AlterConfigsRequest; +import org.apache.kafka.common.requests.AlterConfigsResponse; +import org.apache.kafka.common.requests.AlterReplicaDirRequest; +import org.apache.kafka.common.requests.AlterReplicaDirResponse; +import org.apache.kafka.common.requests.ApiVersionsRequest; +import org.apache.kafka.common.requests.ApiVersionsResponse; +import org.apache.kafka.common.requests.ControlledShutdownRequest; +import org.apache.kafka.common.requests.ControlledShutdownResponse; +import org.apache.kafka.common.requests.CreateAclsRequest; +import org.apache.kafka.common.requests.CreateAclsResponse; +import org.apache.kafka.common.requests.CreateTopicsRequest; +import org.apache.kafka.common.requests.CreateTopicsResponse; +import org.apache.kafka.common.requests.DeleteAclsRequest; +import org.apache.kafka.common.requests.DeleteAclsResponse; +import org.apache.kafka.common.requests.DeleteRecordsRequest; +import org.apache.kafka.common.requests.DeleteRecordsResponse; +import org.apache.kafka.common.requests.DeleteTopicsRequest; +import org.apache.kafka.common.requests.DeleteTopicsResponse; +import org.apache.kafka.common.requests.DescribeAclsRequest; +import org.apache.kafka.common.requests.DescribeAclsResponse; +import org.apache.kafka.common.requests.DescribeConfigsRequest; +import org.apache.kafka.common.requests.DescribeConfigsResponse; +import org.apache.kafka.common.requests.DescribeGroupsRequest; +import org.apache.kafka.common.requests.DescribeGroupsResponse; +import org.apache.kafka.common.requests.DescribeLogDirsRequest; +import org.apache.kafka.common.requests.DescribeLogDirsResponse; +import org.apache.kafka.common.requests.EndTxnRequest; +import org.apache.kafka.common.requests.EndTxnResponse; +import org.apache.kafka.common.requests.FetchRequest; +import org.apache.kafka.common.requests.FetchResponse; +import org.apache.kafka.common.requests.FindCoordinatorRequest; +import org.apache.kafka.common.requests.FindCoordinatorResponse; +import org.apache.kafka.common.requests.HeartbeatRequest; +import org.apache.kafka.common.requests.HeartbeatResponse; +import org.apache.kafka.common.requests.InitProducerIdRequest; +import org.apache.kafka.common.requests.InitProducerIdResponse; +import org.apache.kafka.common.requests.JoinGroupRequest; +import org.apache.kafka.common.requests.JoinGroupResponse; +import org.apache.kafka.common.requests.LeaderAndIsrRequest; +import org.apache.kafka.common.requests.LeaderAndIsrResponse; +import org.apache.kafka.common.requests.LeaveGroupRequest; +import org.apache.kafka.common.requests.LeaveGroupResponse; +import org.apache.kafka.common.requests.ListGroupsRequest; +import org.apache.kafka.common.requests.ListGroupsResponse; +import org.apache.kafka.common.requests.ListOffsetRequest; +import org.apache.kafka.common.requests.ListOffsetResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.OffsetCommitRequest; +import org.apache.kafka.common.requests.OffsetCommitResponse; +import org.apache.kafka.common.requests.OffsetFetchRequest; +import org.apache.kafka.common.requests.OffsetFetchResponse; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest; +import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse; +import org.apache.kafka.common.requests.ProduceRequest; +import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.requests.SaslAuthenticateRequest; +import org.apache.kafka.common.requests.SaslAuthenticateResponse; +import org.apache.kafka.common.requests.SaslHandshakeRequest; +import org.apache.kafka.common.requests.SaslHandshakeResponse; +import org.apache.kafka.common.requests.StopReplicaRequest; +import org.apache.kafka.common.requests.StopReplicaResponse; +import org.apache.kafka.common.requests.SyncGroupRequest; +import org.apache.kafka.common.requests.SyncGroupResponse; +import org.apache.kafka.common.requests.TxnOffsetCommitRequest; +import org.apache.kafka.common.requests.TxnOffsetCommitResponse; +import org.apache.kafka.common.requests.UpdateMetadataRequest; +import org.apache.kafka.common.requests.UpdateMetadataResponse; +import org.apache.kafka.common.requests.WriteTxnMarkersRequest; +import org.apache.kafka.common.requests.WriteTxnMarkersResponse; import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.kafka.common.protocol.types.Type.BYTES; +import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES; +import static org.apache.kafka.common.protocol.types.Type.RECORDS; /** * Identifiers for all the Kafka APIs */ public enum ApiKeys { - PRODUCE(0, "Produce"), - FETCH(1, "Fetch"), - LIST_OFFSETS(2, "Offsets"), - METADATA(3, "Metadata"), - LEADER_AND_ISR(4, "LeaderAndIsr", true), - STOP_REPLICA(5, "StopReplica", true), - UPDATE_METADATA_KEY(6, "UpdateMetadata", true), - CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown", true), - OFFSET_COMMIT(8, "OffsetCommit"), - OFFSET_FETCH(9, "OffsetFetch"), - FIND_COORDINATOR(10, "FindCoordinator"), - JOIN_GROUP(11, "JoinGroup"), - HEARTBEAT(12, "Heartbeat"), - LEAVE_GROUP(13, "LeaveGroup"), - SYNC_GROUP(14, "SyncGroup"), - DESCRIBE_GROUPS(15, "DescribeGroups"), - LIST_GROUPS(16, "ListGroups"), - SASL_HANDSHAKE(17, "SaslHandshake"), - API_VERSIONS(18, "ApiVersions") { + PRODUCE(0, "Produce", ProduceRequest.schemaVersions(), ProduceResponse.schemaVersions()), + FETCH(1, "Fetch", FetchRequest.schemaVersions(), FetchResponse.schemaVersions()), + LIST_OFFSETS(2, "ListOffsets", ListOffsetRequest.schemaVersions(), ListOffsetResponse.schemaVersions()), + METADATA(3, "Metadata", MetadataRequest.schemaVersions(), MetadataResponse.schemaVersions()), + LEADER_AND_ISR(4, "LeaderAndIsr", true, LeaderAndIsrRequest.schemaVersions(), LeaderAndIsrResponse.schemaVersions()), + STOP_REPLICA(5, "StopReplica", true, StopReplicaRequest.schemaVersions(), StopReplicaResponse.schemaVersions()), + UPDATE_METADATA(6, "UpdateMetadata", true, UpdateMetadataRequest.schemaVersions(), + UpdateMetadataResponse.schemaVersions()), + CONTROLLED_SHUTDOWN(7, "ControlledShutdown", true, ControlledShutdownRequest.schemaVersions(), + ControlledShutdownResponse.schemaVersions()), + OFFSET_COMMIT(8, "OffsetCommit", OffsetCommitRequest.schemaVersions(), OffsetCommitResponse.schemaVersions()), + OFFSET_FETCH(9, "OffsetFetch", OffsetFetchRequest.schemaVersions(), OffsetFetchResponse.schemaVersions()), + FIND_COORDINATOR(10, "FindCoordinator", FindCoordinatorRequest.schemaVersions(), + FindCoordinatorResponse.schemaVersions()), + JOIN_GROUP(11, "JoinGroup", JoinGroupRequest.schemaVersions(), JoinGroupResponse.schemaVersions()), + HEARTBEAT(12, "Heartbeat", HeartbeatRequest.schemaVersions(), HeartbeatResponse.schemaVersions()), + LEAVE_GROUP(13, "LeaveGroup", LeaveGroupRequest.schemaVersions(), LeaveGroupResponse.schemaVersions()), + SYNC_GROUP(14, "SyncGroup", SyncGroupRequest.schemaVersions(), SyncGroupResponse.schemaVersions()), + DESCRIBE_GROUPS(15, "DescribeGroups", DescribeGroupsRequest.schemaVersions(), + DescribeGroupsResponse.schemaVersions()), + LIST_GROUPS(16, "ListGroups", ListGroupsRequest.schemaVersions(), ListGroupsResponse.schemaVersions()), + SASL_HANDSHAKE(17, "SaslHandshake", SaslHandshakeRequest.schemaVersions(), SaslHandshakeResponse.schemaVersions()), + API_VERSIONS(18, "ApiVersions", ApiVersionsRequest.schemaVersions(), ApiVersionsResponse.schemaVersions()) { @Override public Struct parseResponse(short version, ByteBuffer buffer) { // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest @@ -54,24 +138,37 @@ public enum ApiKeys { return parseResponse(version, buffer, (short) 0); } }, - CREATE_TOPICS(19, "CreateTopics"), - DELETE_TOPICS(20, "DeleteTopics"), - DELETE_RECORDS(21, "DeleteRecords"), - INIT_PRODUCER_ID(22, "InitProducerId"), - OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true), - ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2), - ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2), - END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2), - WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2), - TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2), - DESCRIBE_ACLS(29, "DescribeAcls"), - CREATE_ACLS(30, "CreateAcls"), - DELETE_ACLS(31, "DeleteAcls"), - DESCRIBE_CONFIGS(32, "DescribeConfigs"), - ALTER_CONFIGS(33, "AlterConfigs"), - ALTER_REPLICA_DIR(34, "AlterReplicaDir"), - DESCRIBE_LOG_DIRS(35, "DescribeLogDirs"), - SASL_AUTHENTICATE(36, "SaslAuthenticate"); + CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequest.schemaVersions(), CreateTopicsResponse.schemaVersions()), + DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequest.schemaVersions(), DeleteTopicsResponse.schemaVersions()), + DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()), + INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(), + InitProducerIdResponse.schemaVersions()), + OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true, OffsetsForLeaderEpochRequest.schemaVersions(), + OffsetsForLeaderEpochResponse.schemaVersions()), + ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false, RecordBatch.MAGIC_VALUE_V2, + AddPartitionsToTxnRequest.schemaVersions(), AddPartitionsToTxnResponse.schemaVersions()), + ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false, RecordBatch.MAGIC_VALUE_V2, AddOffsetsToTxnRequest.schemaVersions(), + AddOffsetsToTxnResponse.schemaVersions()), + END_TXN(26, "EndTxn", false, RecordBatch.MAGIC_VALUE_V2, EndTxnRequest.schemaVersions(), + EndTxnResponse.schemaVersions()), + WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true, RecordBatch.MAGIC_VALUE_V2, WriteTxnMarkersRequest.schemaVersions(), + WriteTxnMarkersResponse.schemaVersions()), + TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false, RecordBatch.MAGIC_VALUE_V2, TxnOffsetCommitRequest.schemaVersions(), + TxnOffsetCommitResponse.schemaVersions()), + DESCRIBE_ACLS(29, "DescribeAcls", DescribeAclsRequest.schemaVersions(), DescribeAclsResponse.schemaVersions()), + CREATE_ACLS(30, "CreateAcls", CreateAclsRequest.schemaVersions(), CreateAclsResponse.schemaVersions()), + DELETE_ACLS(31, "DeleteAcls", DeleteAclsRequest.schemaVersions(), DeleteAclsResponse.schemaVersions()), + DESCRIBE_CONFIGS(32, "DescribeConfigs", DescribeConfigsRequest.schemaVersions(), + DescribeConfigsResponse.schemaVersions()), + ALTER_CONFIGS(33, "AlterConfigs", AlterConfigsRequest.schemaVersions(), + AlterConfigsResponse.schemaVersions()), + ALTER_REPLICA_DIR(34, "AlterReplicaDir", AlterReplicaDirRequest.schemaVersions(), + AlterReplicaDirResponse.schemaVersions()), + DESCRIBE_LOG_DIRS(35, "DescribeLogDirs", DescribeLogDirsRequest.schemaVersions(), + DescribeLogDirsResponse.schemaVersions()), + SASL_AUTHENTICATE(36, "SaslAuthenticate", SaslAuthenticateRequest.schemaVersions(), + SaslAuthenticateResponse.schemaVersions()); + private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; @@ -100,21 +197,48 @@ public enum ApiKeys { /** indicates the minimum required inter broker magic required to support the API */ public final byte minRequiredInterBrokerMagic; - ApiKeys(int id, String name) { - this(id, name, false); + public final Schema[] requestSchemas; + public final Schema[] responseSchemas; + public final boolean requiresDelayedAllocation; + + ApiKeys(int id, String name, Schema[] requestSchemas, Schema[] responseSchemas) { + this(id, name, false, requestSchemas, responseSchemas); } - ApiKeys(int id, String name, boolean clusterAction) { - this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0); + ApiKeys(int id, String name, boolean clusterAction, Schema[] requestSchemas, Schema[] responseSchemas) { + this(id, name, clusterAction, RecordBatch.MAGIC_VALUE_V0, requestSchemas, responseSchemas); } - ApiKeys(int id, String name, boolean clusterAction, byte minRequiredInterBrokerMagic) { + ApiKeys(int id, String name, boolean clusterAction, byte minRequiredInterBrokerMagic, + Schema[] requestSchemas, Schema[] responseSchemas) { if (id < 0) throw new IllegalArgumentException("id must not be negative, id: " + id); this.id = (short) id; this.name = name; this.clusterAction = clusterAction; this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic; + + if (requestSchemas.length != responseSchemas.length) + throw new IllegalStateException(requestSchemas.length + " request versions for api " + name + + " but " + responseSchemas.length + " response versions."); + + for (int i = 0; i < requestSchemas.length; ++i) { + if (requestSchemas[i] == null) + throw new IllegalStateException("Request schema for api " + name + " for version " + i + " is null"); + if (responseSchemas[i] == null) + throw new IllegalStateException("Response schema for api " + name + " for version " + i + " is null"); + } + + boolean requestRetainsBufferReference = false; + for (Schema requestVersionSchema : requestSchemas) { + if (retainsBufferReference(requestVersionSchema)) { + requestRetainsBufferReference = true; + break; + } + } + this.requiresDelayedAllocation = requestRetainsBufferReference; + this.requestSchemas = requestSchemas; + this.responseSchemas = responseSchemas; } public static ApiKeys forId(int id) { @@ -129,23 +253,19 @@ public enum ApiKeys { } public short latestVersion() { - if (id >= Protocol.CURR_VERSION.length) - throw new IllegalArgumentException("Latest version for API key " + this + " is not defined"); - return Protocol.CURR_VERSION[id]; + return (short) (requestSchemas.length - 1); } public short oldestVersion() { - if (id >= Protocol.MIN_VERSIONS.length) - throw new IllegalArgumentException("Oldest version for API key " + this + " is not defined"); - return Protocol.MIN_VERSIONS[id]; + return 0; } public Schema requestSchema(short version) { - return schemaFor(Protocol.REQUESTS, version); + return schemaFor(requestSchemas, version); } public Schema responseSchema(short version) { - return schemaFor(Protocol.RESPONSES, version); + return schemaFor(responseSchemas, version); } public Struct parseRequest(short version, ByteBuffer buffer) { @@ -169,18 +289,16 @@ public enum ApiKeys { } } - private Schema schemaFor(Schema[][] schemas, short version) { - if (id > schemas.length) - throw new IllegalArgumentException("No schema available for API key " + this); - if (version < 0 || version > latestVersion()) + private Schema schemaFor(Schema[] versions, short version) { + if (!isVersionSupported(version)) throw new IllegalArgumentException("Invalid version for API key " + this + ": " + version); - - Schema[] versions = schemas[id]; - if (versions[version] == null) - throw new IllegalArgumentException("Unsupported version for API key " + this + ": " + version); return versions[version]; } + public boolean isVersionSupported(short apiVersion) { + return apiVersion >= oldestVersion() && apiVersion <= latestVersion(); + } + private static String toHtml() { final StringBuilder b = new StringBuilder(); b.append("<table class=\"data-table\"><tbody>\n"); @@ -206,4 +324,19 @@ public enum ApiKeys { System.out.println(toHtml()); } + private static boolean retainsBufferReference(Schema schema) { + final AtomicReference<Boolean> foundBufferReference = new AtomicReference<>(Boolean.FALSE); + SchemaVisitor detector = new SchemaVisitorAdapter() { + @Override + public void visit(Type field) { + if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS) { + foundBufferReference.set(Boolean.TRUE); + } + } + }; + foundBufferReference.set(Boolean.FALSE); + ProtoUtils.walk(schema, detector); + return foundBufferReference.get(); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java new file mode 100644 index 0000000..e12cde4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.protocol.types.Field; + +public class CommonFields { + public static final Field.Int32 THROTTLE_TIME_MS = new Field.Int32("throttle_time_ms", + "Duration in milliseconds for which the request was throttled due to quota violation (Zero if the " + + "request did not violate any quota)", 0); + public static final Field.Str TOPIC_NAME = new Field.Str("topic", "Name of topic"); + public static final Field.Int32 PARTITION_ID = new Field.Int32("partition", "Topic partition id"); + public static final Field.Int16 ERROR_CODE = new Field.Int16("error_code", "Response error code"); + public static final Field.NullableStr ERROR_MESSAGE = new Field.NullableStr("error_message", "Response error message"); + + // ACL Apis + public static final Field.Int8 RESOURCE_TYPE = new Field.Int8("resource_type", "The resource type"); + public static final Field.Str RESOURCE_NAME = new Field.Str("resource_name", "The resource name"); + public static final Field.NullableStr RESOURCE_NAME_FILTER = new Field.NullableStr("resource_name", "The resource name filter"); + public static final Field.Str PRINCIPAL = new Field.Str("principal", "The ACL principal"); + public static final Field.NullableStr PRINCIPAL_FILTER = new Field.NullableStr("principal", "The ACL principal filter"); + public static final Field.Str HOST = new Field.Str("host", "The ACL host"); + public static final Field.NullableStr HOST_FILTER = new Field.NullableStr("host", "The ACL host filter"); + public static final Field.Int8 OPERATION = new Field.Int8("operation", "The ACL operation"); + public static final Field.Int8 PERMISSION_TYPE = new Field.Int8("permission_type", "The ACL permission type"); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/0cf77080/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java index 5d39dff..f9be12c 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java @@ -17,7 +17,7 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; +import org.apache.kafka.common.protocol.types.BoundField; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Type; @@ -33,8 +33,8 @@ public class ProtoUtils { if (node instanceof Schema) { Schema schema = (Schema) node; visitor.visit(schema); - for (Field f : schema.fields()) { - handleNode(f.type, visitor); + for (BoundField f : schema.fields()) { + handleNode(f.def.type, visitor); } } else if (node instanceof ArrayOf) { ArrayOf array = (ArrayOf) node;