KAFKA-4954; Request handler utilization quotas See KIP-124 (https://cwiki.apache.org/confluence/display/KAFKA/KIP-124+-+Request+rate+quotas) for details
Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jun Rao <jun...@gmail.com> Closes #2744 from rajinisivaram/KAFKA-4954 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0104b657 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0104b657 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0104b657 Branch: refs/heads/trunk Commit: 0104b657a154fb15e716d872a0e6084f9da650bf Parents: 6185bc0 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Mon May 1 09:13:31 2017 -0700 Committer: Jun Rao <jun...@gmail.com> Committed: Mon May 1 09:13:31 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/NetworkClient.java | 30 +- .../org/apache/kafka/common/metrics/Sensor.java | 9 +- .../kafka/common/network/KafkaChannel.java | 22 +- .../apache/kafka/common/network/Selector.java | 23 + .../apache/kafka/common/protocol/ApiKeys.java | 86 +-- .../apache/kafka/common/protocol/Protocol.java | 212 ++++++-- .../kafka/common/requests/AbstractRequest.java | 9 +- .../kafka/common/requests/AbstractResponse.java | 1 + .../common/requests/AddOffsetsToTxnRequest.java | 4 +- .../requests/AddOffsetsToTxnResponse.java | 11 +- .../requests/AddPartitionsToTxnRequest.java | 4 +- .../requests/AddPartitionsToTxnResponse.java | 11 +- .../common/requests/ApiVersionsRequest.java | 8 +- .../common/requests/ApiVersionsResponse.java | 31 +- .../requests/ControlledShutdownRequest.java | 2 +- .../common/requests/CreateTopicsRequest.java | 4 +- .../common/requests/CreateTopicsResponse.java | 14 + .../common/requests/DeleteRecordsRequest.java | 4 +- .../common/requests/DeleteRecordsResponse.java | 12 +- .../common/requests/DeleteTopicsRequest.java | 4 +- .../common/requests/DeleteTopicsResponse.java | 14 + .../common/requests/DescribeGroupsRequest.java | 4 +- .../common/requests/DescribeGroupsResponse.java | 20 +- .../kafka/common/requests/EndTxnRequest.java | 4 +- .../kafka/common/requests/EndTxnResponse.java | 11 +- .../kafka/common/requests/FetchRequest.java | 4 +- .../common/requests/FindCoordinatorRequest.java | 5 +- .../requests/FindCoordinatorResponse.java | 14 + .../kafka/common/requests/HeartbeatRequest.java | 4 +- .../common/requests/HeartbeatResponse.java | 14 + .../kafka/common/requests/InitPidRequest.java | 4 +- .../kafka/common/requests/InitPidResponse.java | 15 +- .../kafka/common/requests/JoinGroupRequest.java | 11 +- .../common/requests/JoinGroupResponse.java | 20 + .../common/requests/LeaderAndIsrRequest.java | 2 +- .../common/requests/LeaveGroupRequest.java | 4 +- .../common/requests/LeaveGroupResponse.java | 14 + .../common/requests/ListGroupsRequest.java | 4 +- .../common/requests/ListGroupsResponse.java | 20 +- .../common/requests/ListOffsetRequest.java | 6 +- .../common/requests/ListOffsetResponse.java | 16 +- .../kafka/common/requests/MetadataRequest.java | 4 +- .../kafka/common/requests/MetadataResponse.java | 14 + .../common/requests/OffsetCommitRequest.java | 5 +- .../common/requests/OffsetCommitResponse.java | 14 + .../common/requests/OffsetFetchRequest.java | 10 +- .../common/requests/OffsetFetchResponse.java | 22 +- .../requests/OffsetsForLeaderEpochRequest.java | 2 +- .../kafka/common/requests/ProduceRequest.java | 4 +- .../kafka/common/requests/ProduceResponse.java | 1 - .../common/requests/SaslHandshakeRequest.java | 25 +- .../common/requests/StopReplicaRequest.java | 2 +- .../kafka/common/requests/SyncGroupRequest.java | 7 +- .../common/requests/SyncGroupResponse.java | 14 + .../common/requests/TxnOffsetCommitRequest.java | 4 +- .../requests/TxnOffsetCommitResponse.java | 11 +- .../common/requests/UpdateMetadataRequest.java | 2 +- .../common/requests/WriteTxnMarkersRequest.java | 2 +- .../apache/kafka/clients/NetworkClientTest.java | 3 +- .../clients/producer/internals/SenderTest.java | 2 +- .../internals/TransactionManagerTest.java | 10 +- .../common/network/SslTransportLayerTest.java | 39 ++ .../kafka/common/protocol/ApiKeysTest.java | 31 ++ .../common/requests/RequestResponseTest.java | 10 +- .../scala/kafka/network/RequestChannel.scala | 82 ++- .../main/scala/kafka/network/SocketServer.scala | 31 +- .../scala/kafka/server/ClientQuotaManager.scala | 101 ++-- .../server/ClientRequestQuotaManager.scala | 54 ++ .../main/scala/kafka/server/ConfigHandler.scala | 6 + .../main/scala/kafka/server/DynamicConfig.scala | 5 + .../src/main/scala/kafka/server/KafkaApis.scala | 539 +++++++++++-------- .../kafka/server/KafkaRequestHandler.scala | 6 +- .../main/scala/kafka/server/QuotaFactory.scala | 12 +- .../integration/kafka/api/AdminClientTest.scala | 2 +- .../integration/kafka/api/BaseQuotaTest.scala | 65 ++- .../kafka/api/ClientIdQuotaTest.scala | 5 +- .../kafka/api/UserClientIdQuotaTest.scala | 8 +- .../integration/kafka/api/UserQuotaTest.scala | 8 +- .../kafka/server/ApiVersionsRequestTest.scala | 6 +- .../unit/kafka/server/BaseRequestTest.scala | 2 +- .../kafka/server/ClientQuotaManagerTest.scala | 62 +++ .../unit/kafka/server/RequestQuotaTest.scala | 420 +++++++++++++++ 82 files changed, 1879 insertions(+), 484 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 7bd0311..df9e2fa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -42,13 +42,12 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.HashSet; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Set; /** * A network client for asynchronous request/response network i/o. This is an internal class used to implement the @@ -100,7 +99,7 @@ public class NetworkClient implements KafkaClient { private final ApiVersions apiVersions; - private final Set<String> nodesNeedingApiVersionsFetch = new HashSet<>(); + private final Map<String, ApiVersionsRequest.Builder> nodesNeedingApiVersionsFetch = new HashMap<>(); private final List<ClientResponse> abortedSends = new LinkedList<>(); @@ -471,7 +470,7 @@ public class NetworkClient implements KafkaClient { ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer); // Always expect the response version id to be the same as the request version id ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey()); - Struct responseBody = apiKey.responseSchema(requestHeader.apiVersion()).read(responseBuffer); + Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), responseBuffer); correlate(requestHeader, responseHeader); return AbstractResponse.getResponse(apiKey, responseBody); } @@ -564,10 +563,14 @@ public class NetworkClient implements KafkaClient { InFlightRequest req, long now, ApiVersionsResponse apiVersionsResponse) { final String node = req.destination; if (apiVersionsResponse.error() != Errors.NONE) { - log.warn("Node {} got error {} when making an ApiVersionsRequest. Disconnecting.", - node, apiVersionsResponse.error()); - this.selector.close(node); - processDisconnection(responses, node, now); + if (req.request.version() == 0 || apiVersionsResponse.error() != Errors.UNSUPPORTED_VERSION) { + log.warn("Node {} got error {} when making an ApiVersionsRequest. Disconnecting.", + node, apiVersionsResponse.error()); + this.selector.close(node); + processDisconnection(responses, node, now); + } else { + nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder((short) 0)); + } return; } NodeApiVersions nodeVersionInfo = new NodeApiVersions(apiVersionsResponse.apiVersions()); @@ -605,7 +608,7 @@ public class NetworkClient implements KafkaClient { // connection. if (discoverBrokerVersions) { this.connectionStates.checkingApiVersions(node); - nodesNeedingApiVersionsFetch.add(node); + nodesNeedingApiVersionsFetch.put(node, new ApiVersionsRequest.Builder()); log.debug("Completed connection to node {}. Fetching API versions.", node); } else { this.connectionStates.ready(node); @@ -615,13 +618,14 @@ public class NetworkClient implements KafkaClient { } private void handleInitiateApiVersionRequests(long now) { - Iterator<String> iter = nodesNeedingApiVersionsFetch.iterator(); + Iterator<Map.Entry<String, ApiVersionsRequest.Builder>> iter = nodesNeedingApiVersionsFetch.entrySet().iterator(); while (iter.hasNext()) { - String node = iter.next(); + Map.Entry<String, ApiVersionsRequest.Builder> entry = iter.next(); + String node = entry.getKey(); if (selector.isChannelReady(node) && inFlightRequests.canSendMore(node)) { log.debug("Initiating API versions fetch from node {}.", node); - ApiVersionsRequest.Builder apiVersionRequest = new ApiVersionsRequest.Builder(); - ClientRequest clientRequest = newClientRequest(node, apiVersionRequest, now, true); + ApiVersionsRequest.Builder apiVersionRequestBuilder = entry.getValue(); + ClientRequest clientRequest = newClientRequest(node, apiVersionRequestBuilder, now, true); doSend(clientRequest, true, now); iter.remove(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java index 5ca9fce..ae331e7 100644 --- a/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java +++ b/clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java @@ -168,16 +168,21 @@ public final class Sensor { * bound */ public void record(double value, long timeMs) { + record(value, timeMs, true); + } + + public void record(double value, long timeMs, boolean checkQuotas) { if (shouldRecord()) { this.lastRecordTime = timeMs; synchronized (this) { // increment all the stats for (Stat stat : this.stats) stat.record(config, value, timeMs); - checkQuotas(timeMs); + if (checkQuotas) + checkQuotas(timeMs); } for (Sensor parent : parents) - parent.record(value, timeMs); + parent.record(value, timeMs, checkQuotas); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index f1bf86c..ea03ff0 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -31,6 +31,9 @@ public class KafkaChannel { private final String id; private final TransportLayer transportLayer; private final Authenticator authenticator; + // Tracks accumulated network thread time. This is updated on the network thread. + // The values are read and reset after each response is sent. + private long networkThreadTimeNanos; private final int maxReceiveSize; private NetworkReceive receive; private Send send; @@ -43,6 +46,7 @@ public class KafkaChannel { this.id = id; this.transportLayer = transportLayer; this.authenticator = authenticator; + this.networkThreadTimeNanos = 0L; this.maxReceiveSize = maxReceiveSize; this.disconnected = false; this.muted = false; @@ -164,6 +168,23 @@ public class KafkaChannel { return result; } + /** + * Accumulates network thread time for this channel. + */ + public void addNetworkThreadTimeNanos(long nanos) { + networkThreadTimeNanos += nanos; + } + + /** + * Returns accumulated network thread time for this channel and resets + * the value to zero. + */ + public long getAndResetNetworkThreadTimeNanos() { + long current = networkThreadTimeNanos; + networkThreadTimeNanos = 0; + return current; + } + private long receive(NetworkReceive receive) throws IOException { return receive.readFrom(transportLayer); } @@ -175,5 +196,4 @@ public class KafkaChannel { return send.completed(); } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index fd3ab47..8dd3ad6 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -101,6 +101,7 @@ public class Selector implements Selectable { private final ChannelBuilder channelBuilder; private final int maxReceiveSize; private final boolean metricsPerConnection; + private final boolean recordTimePerConnection; private final IdleExpiryManager idleExpiryManager; /** @@ -122,6 +123,7 @@ public class Selector implements Selectable { String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection, + boolean recordTimePerConnection, ChannelBuilder channelBuilder) { try { this.nioSelector = java.nio.channels.Selector.open(); @@ -144,9 +146,21 @@ public class Selector implements Selectable { this.sensors = new SelectorMetrics(metrics); this.channelBuilder = channelBuilder; this.metricsPerConnection = metricsPerConnection; + this.recordTimePerConnection = recordTimePerConnection; this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs); } + public Selector(int maxReceiveSize, + long connectionMaxIdleMs, + Metrics metrics, + Time time, + String metricGrpPrefix, + Map<String, String> metricTags, + boolean metricsPerConnection, + ChannelBuilder channelBuilder) { + this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder); + } + public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) { this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder); } @@ -326,6 +340,7 @@ public class Selector implements Selectable { SelectionKey key = iterator.next(); iterator.remove(); KafkaChannel channel = channel(key); + long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0; // register all per-connection metrics at once sensors.maybeRegisterConnectionMetrics(channel.id()); @@ -380,10 +395,18 @@ public class Selector implements Selectable { else log.warn("Unexpected error from {}; closing connection", desc, e); close(channel, true); + } finally { + maybeRecordTimePerConnection(channel, channelStartTimeNanos); } } } + // Record time spent in pollSelectionKeys for channel (moved into a method to keep checkstyle happy) + private void maybeRecordTimePerConnection(KafkaChannel channel, long startTimeNanos) { + if (recordTimePerConnection) + channel.addNetworkThreadTimeNanos(time.nanoseconds() - startTimeNanos); + } + @Override public List<Send> completedSends() { return this.completedSends; http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 9e7ce1d..b98a33e 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.protocol.types.Schema; +import org.apache.kafka.common.protocol.types.SchemaException; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -25,35 +26,43 @@ import java.nio.ByteBuffer; * Identifiers for all the Kafka APIs */ public enum ApiKeys { - PRODUCE(0, "Produce"), - FETCH(1, "Fetch"), - LIST_OFFSETS(2, "Offsets"), - METADATA(3, "Metadata"), - LEADER_AND_ISR(4, "LeaderAndIsr"), - STOP_REPLICA(5, "StopReplica"), - UPDATE_METADATA_KEY(6, "UpdateMetadata"), - CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown"), - OFFSET_COMMIT(8, "OffsetCommit"), - OFFSET_FETCH(9, "OffsetFetch"), - FIND_COORDINATOR(10, "FindCoordinator"), - JOIN_GROUP(11, "JoinGroup"), - HEARTBEAT(12, "Heartbeat"), - LEAVE_GROUP(13, "LeaveGroup"), - SYNC_GROUP(14, "SyncGroup"), - DESCRIBE_GROUPS(15, "DescribeGroups"), - LIST_GROUPS(16, "ListGroups"), - SASL_HANDSHAKE(17, "SaslHandshake"), - API_VERSIONS(18, "ApiVersions"), - CREATE_TOPICS(19, "CreateTopics"), - DELETE_TOPICS(20, "DeleteTopics"), - DELETE_RECORDS(21, "DeleteRecords"), - INIT_PRODUCER_ID(22, "InitProducerId"), - OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch"), - ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn"), - ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn"), - END_TXN(26, "EndTxn"), - WRITE_TXN_MARKERS(27, "WriteTxnMarkers"), - TXN_OFFSET_COMMIT(28, "TxnOffsetCommit"); + PRODUCE(0, "Produce", false), + FETCH(1, "Fetch", false), + LIST_OFFSETS(2, "Offsets", false), + METADATA(3, "Metadata", false), + LEADER_AND_ISR(4, "LeaderAndIsr", true), + STOP_REPLICA(5, "StopReplica", true), + UPDATE_METADATA_KEY(6, "UpdateMetadata", true), + CONTROLLED_SHUTDOWN_KEY(7, "ControlledShutdown", true), + OFFSET_COMMIT(8, "OffsetCommit", false), + OFFSET_FETCH(9, "OffsetFetch", false), + FIND_COORDINATOR(10, "FindCoordinator", false), + JOIN_GROUP(11, "JoinGroup", false), + HEARTBEAT(12, "Heartbeat", false), + LEAVE_GROUP(13, "LeaveGroup", false), + SYNC_GROUP(14, "SyncGroup", false), + DESCRIBE_GROUPS(15, "DescribeGroups", false), + LIST_GROUPS(16, "ListGroups", false), + SASL_HANDSHAKE(17, "SaslHandshake", false), + API_VERSIONS(18, "ApiVersions", false) { + @Override + public Struct parseResponse(short version, ByteBuffer buffer) { + // Fallback to version 0 for ApiVersions response. If a client sends an ApiVersionsRequest + // using a version higher than that supported by the broker, a version 0 response is sent + // to the client indicating UNSUPPORTED_VERSION. + return parseResponse(version, buffer, (short) 0); + } + }, + CREATE_TOPICS(19, "CreateTopics", false), + DELETE_TOPICS(20, "DeleteTopics", false), + DELETE_RECORDS(21, "DeleteRecords", false), + INIT_PRODUCER_ID(22, "InitProducerId", false), + OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch", true), + ADD_PARTITIONS_TO_TXN(24, "AddPartitionsToTxn", false), + ADD_OFFSETS_TO_TXN(25, "AddOffsetsToTxn", false), + END_TXN(26, "EndTxn", false), + WRITE_TXN_MARKERS(27, "WriteTxnMarkers", true), + TXN_OFFSET_COMMIT(28, "TxnOffsetCommit", false); private static final ApiKeys[] ID_TO_TYPE; private static final int MIN_API_KEY = 0; @@ -76,11 +85,15 @@ public enum ApiKeys { /** an english description of the api--this is for debugging and can change */ public final String name; - ApiKeys(int id, String name) { + /** indicates if this is a ClusterAction request used only by brokers */ + public final boolean clusterAction; + + ApiKeys(int id, String name, boolean clusterAction) { if (id < 0) throw new IllegalArgumentException("id must not be negative, id: " + id); this.id = (short) id; this.name = name; + this.clusterAction = clusterAction; } public static ApiKeys forId(int id) { @@ -122,6 +135,19 @@ public enum ApiKeys { return responseSchema(version).read(buffer); } + protected Struct parseResponse(short version, ByteBuffer buffer, short fallbackVersion) { + int bufferPosition = buffer.position(); + try { + return responseSchema(version).read(buffer); + } catch (SchemaException e) { + if (version != fallbackVersion) { + buffer.position(bufferPosition); + return responseSchema(fallbackVersion).read(buffer); + } else + throw e; + } + } + private Schema schemaFor(Schema[][] schemas, short version) { if (id > schemas.length) throw new IllegalArgumentException("No schema available for API key " + this); http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 54f533e..3da2b3f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -65,6 +65,9 @@ public class Protocol { /* The v2 metadata request is the same as v1. An additional field for cluster id has been added to the v2 metadata response */ public static final Schema METADATA_REQUEST_V2 = METADATA_REQUEST_V1; + /* The v3 metadata request is the same as v1 and v2. An additional field for throttle time has been added to the v3 metadata response */ + public static final Schema METADATA_REQUEST_V3 = METADATA_REQUEST_V2; + public static final Schema METADATA_BROKER_V0 = new Schema(new Field("node_id", INT32, "The broker id."), new Field("host", STRING, "The hostname of the broker."), new Field("port", INT32, @@ -129,9 +132,19 @@ public class Protocol { "The broker id of the controller broker."), new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1))); + public static final Schema METADATA_RESPONSE_V3 = new Schema( + newThrottleTimeField(), + new Field("brokers", new ArrayOf(METADATA_BROKER_V1), + "Host and port information for all brokers."), + new Field("cluster_id", NULLABLE_STRING, + "The cluster id that this broker belongs to."), + new Field("controller_id", INT32, + "The broker id of the controller broker."), + new Field("topic_metadata", new ArrayOf(TOPIC_METADATA_V1))); + - public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2}; - public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2}; + public static final Schema[] METADATA_REQUEST = new Schema[] {METADATA_REQUEST_V0, METADATA_REQUEST_V1, METADATA_REQUEST_V2, METADATA_REQUEST_V3}; + public static final Schema[] METADATA_RESPONSE = new Schema[] {METADATA_RESPONSE_V0, METADATA_RESPONSE_V1, METADATA_RESPONSE_V2, METADATA_RESPONSE_V3}; /* Produce api */ @@ -190,11 +203,7 @@ public class Protocol { INT16), new Field("base_offset", INT64))))))), - new Field("throttle_time_ms", - INT32, - "Duration in milliseconds for which the request was throttled" + - " due to quota violation. (Zero if the request did not violate any quota.)", - 0)); + newThrottleTimeField()); /** * PRODUCE_RESPONSE_V2 added a timestamp field in the per partition response status. * The timestamp is log append time if the topic is configured to use log append time. Or it is NoTimestamp when create @@ -215,11 +224,7 @@ public class Protocol { "If CreateTime is used for the topic, the timestamp will be -1. " + "If LogAppendTime is used for the topic, the timestamp will be " + "the broker local time when the messages are appended."))))))), - new Field("throttle_time_ms", - INT32, - "Duration in milliseconds for which the request was throttled" + - " due to quota violation. (Zero if the request did not violate any quota.)", - 0)); + newThrottleTimeField()); public static final Schema PRODUCE_RESPONSE_V3 = PRODUCE_RESPONSE_V2; public static final Schema[] PRODUCE_REQUEST = new Schema[] {PRODUCE_REQUEST_V0, PRODUCE_REQUEST_V1, PRODUCE_REQUEST_V2, PRODUCE_REQUEST_V3}; @@ -316,6 +321,9 @@ public class Protocol { new ArrayOf(OFFSET_COMMIT_REQUEST_TOPIC_V2), "Topics to commit offsets.")); + /* v3 request is same as v2. Throttle time has been added to response */ + public static final Schema OFFSET_COMMIT_REQUEST_V3 = OFFSET_COMMIT_REQUEST_V2; + public static final Schema OFFSET_COMMIT_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), @@ -329,13 +337,18 @@ public class Protocol { public static final Schema OFFSET_COMMIT_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0))); - public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2}; + public static final Schema[] OFFSET_COMMIT_REQUEST = new Schema[] {OFFSET_COMMIT_REQUEST_V0, OFFSET_COMMIT_REQUEST_V1, OFFSET_COMMIT_REQUEST_V2, OFFSET_COMMIT_REQUEST_V3}; /* The response types for V0, V1 and V2 of OFFSET_COMMIT_REQUEST are the same. */ public static final Schema OFFSET_COMMIT_RESPONSE_V1 = OFFSET_COMMIT_RESPONSE_V0; public static final Schema OFFSET_COMMIT_RESPONSE_V2 = OFFSET_COMMIT_RESPONSE_V0; - public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2}; + public static final Schema OFFSET_COMMIT_RESPONSE_V3 = new Schema( + newThrottleTimeField(), + new Field("responses", + new ArrayOf(OFFSET_COMMIT_RESPONSE_TOPIC_V0))); + + public static final Schema[] OFFSET_COMMIT_RESPONSE = new Schema[] {OFFSET_COMMIT_RESPONSE_V0, OFFSET_COMMIT_RESPONSE_V1, OFFSET_COMMIT_RESPONSE_V2, OFFSET_COMMIT_RESPONSE_V3}; /* Offset fetch api */ @@ -401,8 +414,17 @@ public class Protocol { new Field("error_code", INT16)); - public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2}; - public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2}; + /* v3 request is the same as v2. Throttle time has been added to v3 response */ + public static final Schema OFFSET_FETCH_REQUEST_V3 = OFFSET_FETCH_REQUEST_V2; + public static final Schema OFFSET_FETCH_RESPONSE_V3 = new Schema( + newThrottleTimeField(), + new Field("responses", + new ArrayOf(OFFSET_FETCH_RESPONSE_TOPIC_V0)), + new Field("error_code", + INT16)); + + public static final Schema[] OFFSET_FETCH_REQUEST = new Schema[] {OFFSET_FETCH_REQUEST_V0, OFFSET_FETCH_REQUEST_V1, OFFSET_FETCH_REQUEST_V2, OFFSET_FETCH_REQUEST_V3}; + public static final Schema[] OFFSET_FETCH_RESPONSE = new Schema[] {OFFSET_FETCH_RESPONSE_V0, OFFSET_FETCH_RESPONSE_V1, OFFSET_FETCH_RESPONSE_V2, OFFSET_FETCH_RESPONSE_V3}; /* List offset api */ public static final Schema LIST_OFFSET_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -445,6 +467,9 @@ public class Protocol { new ArrayOf(LIST_OFFSET_REQUEST_TOPIC_V1), "Topics to list offsets.")); + /* v2 request is the same as v1. Throttle time has been added to response */ + public static final Schema LIST_OFFSET_REQUEST_V2 = LIST_OFFSET_REQUEST_V1; + public static final Schema LIST_OFFSET_RESPONSE_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), @@ -477,9 +502,13 @@ public class Protocol { public static final Schema LIST_OFFSET_RESPONSE_V1 = new Schema(new Field("responses", new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1))); + public static final Schema LIST_OFFSET_RESPONSE_V2 = new Schema( + newThrottleTimeField(), + new Field("responses", + new ArrayOf(LIST_OFFSET_RESPONSE_TOPIC_V1))); - public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1}; - public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1}; + public static final Schema[] LIST_OFFSET_REQUEST = new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2}; + public static final Schema[] LIST_OFFSET_RESPONSE = new Schema[] {LIST_OFFSET_RESPONSE_V0, LIST_OFFSET_RESPONSE_V1, LIST_OFFSET_RESPONSE_V2}; /* Fetch api */ public static final Schema FETCH_REQUEST_PARTITION_V0 = new Schema(new Field("partition", @@ -630,11 +659,7 @@ public class Protocol { public static final Schema FETCH_RESPONSE_V0 = new Schema(new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); - public static final Schema FETCH_RESPONSE_V1 = new Schema(new Field("throttle_time_ms", - INT32, - "Duration in milliseconds for which the request was throttled" + - " due to quota violation. (Zero if the request did not violate any quota.)", - 0), + public static final Schema FETCH_RESPONSE_V1 = new Schema(newThrottleTimeField(), new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V0))); // Even though fetch response v2 has the same protocol as v1, the record set in the response is different. In v1, @@ -703,19 +728,11 @@ public class Protocol { new Field("partition_responses", new ArrayOf(FETCH_RESPONSE_PARTITION_V5))); public static final Schema FETCH_RESPONSE_V4 = new Schema( - new Field("throttle_time_ms", - INT32, - "Duration in milliseconds for which the request was throttled " + - "due to quota violation (zero if the request did not violate any quota).", - 0), + newThrottleTimeField(), new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V4))); public static final Schema FETCH_RESPONSE_V5 = new Schema( - new Field("throttle_time_ms", - INT32, - "Duration in milliseconds for which the request was throttled " + - "due to quota violation (zero if the request did not violate any quota).", - 0), + newThrottleTimeField(), new Field("responses", new ArrayOf(FETCH_RESPONSE_TOPIC_V5))); public static final Schema[] FETCH_REQUEST = new Schema[] {FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4, FETCH_REQUEST_V5}; @@ -724,19 +741,29 @@ public class Protocol { /* List groups api */ public static final Schema LIST_GROUPS_REQUEST_V0 = new Schema(); + /* v1 request is the same as v0. Throttle time has been added to response */ + public static final Schema LIST_GROUPS_REQUEST_V1 = LIST_GROUPS_REQUEST_V0; + public static final Schema LIST_GROUPS_RESPONSE_GROUP_V0 = new Schema(new Field("group_id", STRING), new Field("protocol_type", STRING)); public static final Schema LIST_GROUPS_RESPONSE_V0 = new Schema(new Field("error_code", INT16), new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); + public static final Schema LIST_GROUPS_RESPONSE_V1 = new Schema( + newThrottleTimeField(), + new Field("error_code", INT16), + new Field("groups", new ArrayOf(LIST_GROUPS_RESPONSE_GROUP_V0))); - public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0}; - public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0}; + public static final Schema[] LIST_GROUPS_REQUEST = new Schema[] {LIST_GROUPS_REQUEST_V0, LIST_GROUPS_REQUEST_V1}; + public static final Schema[] LIST_GROUPS_RESPONSE = new Schema[] {LIST_GROUPS_RESPONSE_V0, LIST_GROUPS_RESPONSE_V1}; /* Describe group api */ public static final Schema DESCRIBE_GROUPS_REQUEST_V0 = new Schema(new Field("group_ids", new ArrayOf(STRING), "List of groupIds to request metadata for (an empty groupId array will return empty group metadata).")); + /* v1 request is the same as v0. Throttle time has been added to response */ + public static final Schema DESCRIBE_GROUPS_REQUEST_V1 = DESCRIBE_GROUPS_REQUEST_V0; + public static final Schema DESCRIBE_GROUPS_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING, "The memberId assigned by the coordinator"), @@ -770,9 +797,12 @@ public class Protocol { "Current group members (only provided if the group is not Dead)")); public static final Schema DESCRIBE_GROUPS_RESPONSE_V0 = new Schema(new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0))); + public static final Schema DESCRIBE_GROUPS_RESPONSE_V1 = new Schema( + newThrottleTimeField(), + new Field("groups", new ArrayOf(DESCRIBE_GROUPS_RESPONSE_GROUP_METADATA_V0))); - public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0}; - public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0}; + public static final Schema[] DESCRIBE_GROUPS_REQUEST = new Schema[] {DESCRIBE_GROUPS_REQUEST_V0, DESCRIBE_GROUPS_REQUEST_V1}; + public static final Schema[] DESCRIBE_GROUPS_RESPONSE = new Schema[] {DESCRIBE_GROUPS_RESPONSE_V0, DESCRIBE_GROUPS_RESPONSE_V1}; /* Find coordinator api */ public static final Schema FIND_COORDINATOR_REQUEST_V0 = new Schema( @@ -802,6 +832,7 @@ public class Protocol { "Host and port information for the coordinator for a consumer group.")); public static final Schema FIND_COORDINATOR_RESPONSE_V1 = new Schema( + newThrottleTimeField(), new Field("error_code", INT16), new Field("error_message", NULLABLE_STRING), new Field("coordinator", @@ -870,6 +901,9 @@ public class Protocol { new ArrayOf(JOIN_GROUP_REQUEST_PROTOCOL_V0), "List of protocols that the member supports")); + /* v2 request is the same as v1. Throttle time has been added to response */ + public static final Schema JOIN_GROUP_REQUEST_V2 = JOIN_GROUP_REQUEST_V1; + public static final Schema JOIN_GROUP_RESPONSE_MEMBER_V0 = new Schema(new Field("member_id", STRING), new Field("member_metadata", BYTES)); @@ -890,9 +924,27 @@ public class Protocol { new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); public static final Schema JOIN_GROUP_RESPONSE_V1 = JOIN_GROUP_RESPONSE_V0; - - public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1}; - public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1}; + public static final Schema JOIN_GROUP_RESPONSE_V2 = new Schema( + newThrottleTimeField(), + new Field("error_code", INT16), + new Field("generation_id", + INT32, + "The generation of the consumer group."), + new Field("group_protocol", + STRING, + "The group protocol selected by the coordinator"), + new Field("leader_id", + STRING, + "The leader of the group"), + new Field("member_id", + STRING, + "The consumer id assigned by the group coordinator."), + new Field("members", + new ArrayOf(JOIN_GROUP_RESPONSE_MEMBER_V0))); + + + public static final Schema[] JOIN_GROUP_REQUEST = new Schema[] {JOIN_GROUP_REQUEST_V0, JOIN_GROUP_REQUEST_V1, JOIN_GROUP_REQUEST_V2}; + public static final Schema[] JOIN_GROUP_RESPONSE = new Schema[] {JOIN_GROUP_RESPONSE_V0, JOIN_GROUP_RESPONSE_V1, JOIN_GROUP_RESPONSE_V2}; /* SyncGroup api */ public static final Schema SYNC_GROUP_REQUEST_MEMBER_V0 = new Schema(new Field("member_id", STRING), @@ -901,10 +953,18 @@ public class Protocol { new Field("generation_id", INT32), new Field("member_id", STRING), new Field("group_assignment", new ArrayOf(SYNC_GROUP_REQUEST_MEMBER_V0))); + + /* v1 request is the same as v0. Throttle time has been added to response */ + public static final Schema SYNC_GROUP_REQUEST_V1 = SYNC_GROUP_REQUEST_V0; + public static final Schema SYNC_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16), new Field("member_assignment", BYTES)); - public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0}; - public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0}; + public static final Schema SYNC_GROUP_RESPONSE_V1 = new Schema( + newThrottleTimeField(), + new Field("error_code", INT16), + new Field("member_assignment", BYTES)); + public static final Schema[] SYNC_GROUP_REQUEST = new Schema[] {SYNC_GROUP_REQUEST_V0, SYNC_GROUP_REQUEST_V1}; + public static final Schema[] SYNC_GROUP_RESPONSE = new Schema[] {SYNC_GROUP_RESPONSE_V0, SYNC_GROUP_RESPONSE_V1}; /* Heartbeat api */ public static final Schema HEARTBEAT_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."), @@ -915,10 +975,16 @@ public class Protocol { STRING, "The member id assigned by the group coordinator.")); + /* v1 request is the same as v0. Throttle time has been added to response */ + public static final Schema HEARTBEAT_REQUEST_V1 = HEARTBEAT_REQUEST_V0; + public static final Schema HEARTBEAT_RESPONSE_V0 = new Schema(new Field("error_code", INT16)); + public static final Schema HEARTBEAT_RESPONSE_V1 = new Schema( + newThrottleTimeField(), + new Field("error_code", INT16)); - public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0}; - public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0}; + public static final Schema[] HEARTBEAT_REQUEST = new Schema[] {HEARTBEAT_REQUEST_V0, HEARTBEAT_REQUEST_V1}; + public static final Schema[] HEARTBEAT_RESPONSE = new Schema[] {HEARTBEAT_RESPONSE_V0, HEARTBEAT_RESPONSE_V1}; /* Leave group api */ public static final Schema LEAVE_GROUP_REQUEST_V0 = new Schema(new Field("group_id", STRING, "The group id."), @@ -926,10 +992,16 @@ public class Protocol { STRING, "The member id assigned by the group coordinator.")); + /* v1 request is the same as v0. Throttle time has been added to response */ + public static final Schema LEAVE_GROUP_REQUEST_V1 = LEAVE_GROUP_REQUEST_V0; + public static final Schema LEAVE_GROUP_RESPONSE_V0 = new Schema(new Field("error_code", INT16)); + public static final Schema LEAVE_GROUP_RESPONSE_V1 = new Schema( + newThrottleTimeField(), + new Field("error_code", INT16)); - public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0}; - public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0}; + public static final Schema[] LEAVE_GROUP_REQUEST = new Schema[] {LEAVE_GROUP_REQUEST_V0, LEAVE_GROUP_REQUEST_V1}; + public static final Schema[] LEAVE_GROUP_RESPONSE = new Schema[] {LEAVE_GROUP_RESPONSE_V0, LEAVE_GROUP_RESPONSE_V1}; /* Leader and ISR api */ public static final Schema LEADER_AND_ISR_REQUEST_PARTITION_STATE_V0 = @@ -1082,15 +1154,22 @@ public class Protocol { /* ApiVersion api */ public static final Schema API_VERSIONS_REQUEST_V0 = new Schema(); + /* v1 request is the same as v0. Throttle time has been added to response */ + public static final Schema API_VERSIONS_REQUEST_V1 = API_VERSIONS_REQUEST_V0; + public static final Schema API_VERSIONS_V0 = new Schema(new Field("api_key", INT16, "API key."), new Field("min_version", INT16, "Minimum supported version."), new Field("max_version", INT16, "Maximum supported version.")); public static final Schema API_VERSIONS_RESPONSE_V0 = new Schema(new Field("error_code", INT16, "Error code."), new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker.")); + public static final Schema API_VERSIONS_RESPONSE_V1 = new Schema( + new Field("error_code", INT16, "Error code."), + new Field("api_versions", new ArrayOf(API_VERSIONS_V0), "API versions supported by the broker."), + newThrottleTimeField()); - public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0}; - public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0}; + public static final Schema[] API_VERSIONS_REQUEST = new Schema[]{API_VERSIONS_REQUEST_V0, API_VERSIONS_REQUEST_V1}; + public static final Schema[] API_VERSIONS_RESPONSE = new Schema[]{API_VERSIONS_RESPONSE_V0, API_VERSIONS_RESPONSE_V1}; /* Admin requests common */ public static final Schema CONFIG_ENTRY = new Schema(new Field("config_key", STRING, "Configuration key name"), @@ -1154,9 +1233,16 @@ public class Protocol { new Field("topic_errors", new ArrayOf(TOPIC_ERROR), "An array of per topic errors.")); + /* v2 request is the same as v1. Throttle time has been added to the response */ + public static final Schema CREATE_TOPICS_REQUEST_V2 = CREATE_TOPICS_REQUEST_V1; + public static final Schema CREATE_TOPICS_RESPONSE_V2 = new Schema( + newThrottleTimeField(), + new Field("topic_errors", + new ArrayOf(TOPIC_ERROR), + "An array of per topic errors.")); - public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1}; - public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1}; + public static final Schema[] CREATE_TOPICS_REQUEST = new Schema[] {CREATE_TOPICS_REQUEST_V0, CREATE_TOPICS_REQUEST_V1, CREATE_TOPICS_REQUEST_V2}; + public static final Schema[] CREATE_TOPICS_RESPONSE = new Schema[] {CREATE_TOPICS_RESPONSE_V0, CREATE_TOPICS_RESPONSE_V1, CREATE_TOPICS_RESPONSE_V2}; /* DeleteTopic api */ public static final Schema DELETE_TOPICS_REQUEST_V0 = new Schema( @@ -1171,9 +1257,16 @@ public class Protocol { new Field("topic_error_codes", new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes.")); + /* v1 request is the same as v0. Throttle time has been added to the response */ + public static final Schema DELETE_TOPICS_REQUEST_V1 = DELETE_TOPICS_REQUEST_V0; + public static final Schema DELETE_TOPICS_RESPONSE_V1 = new Schema( + newThrottleTimeField(), + new Field("topic_error_codes", + new ArrayOf(TOPIC_ERROR_CODE), + "An array of per topic error codes.")); - public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0}; - public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0}; + public static final Schema[] DELETE_TOPICS_REQUEST = new Schema[] {DELETE_TOPICS_REQUEST_V0, DELETE_TOPICS_REQUEST_V1}; + public static final Schema[] DELETE_TOPICS_RESPONSE = new Schema[] {DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1}; public static final Schema DELETE_RECORDS_REQUEST_PARTITION_V0 = new Schema(new Field("partition", INT32, "Topic partition id."), new Field("offset", INT64, "The offset before which the messages will be deleted.")); @@ -1191,7 +1284,9 @@ public class Protocol { public static final Schema DELETE_RECORDS_RESPONSE_TOPIC_V0 = new Schema(new Field("topic", STRING, "Topic name."), new Field("partitions", new ArrayOf(DELETE_RECORDS_RESPONSE_PARTITION_V0))); - public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema(new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0))); + public static final Schema DELETE_RECORDS_RESPONSE_V0 = new Schema( + newThrottleTimeField(), + new Field("topics", new ArrayOf(DELETE_RECORDS_RESPONSE_TOPIC_V0))); public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0}; public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0}; @@ -1207,6 +1302,7 @@ public class Protocol { ); public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema( + newThrottleTimeField(), new Field("error_code", INT16, "An integer error code."), @@ -1289,6 +1385,7 @@ public class Protocol { "The partitions to add to the transaction.") ); public static final Schema ADD_PARTITIONS_TO_TXN_RESPONSE_V0 = new Schema( + newThrottleTimeField(), new Field("error_code", INT16, "An integer error code.") @@ -1312,6 +1409,7 @@ public class Protocol { "Consumer group id whose offsets should be included in the transaction.") ); public static final Schema ADD_OFFSETS_TO_TXN_RESPONSE_V0 = new Schema( + newThrottleTimeField(), new Field("error_code", INT16, "An integer error code.") @@ -1336,6 +1434,7 @@ public class Protocol { ); public static final Schema END_TXN_RESPONSE_V0 = new Schema( + newThrottleTimeField(), new Field("error_code", INT16, "An integer error code.") @@ -1425,6 +1524,7 @@ public class Protocol { ); public static final Schema TXN_OFFSET_COMMIT_RESPONSE_V0 = new Schema( + newThrottleTimeField(), new Field("topics", new ArrayOf(new Schema( new Field("topic", STRING), @@ -1535,6 +1635,12 @@ public class Protocol { return apiKey < CURR_VERSION.length && apiVersion >= MIN_VERSIONS[apiKey] && apiVersion <= CURR_VERSION[apiKey]; } + private static Field newThrottleTimeField() { + return new Field("throttle_time_ms", INT32, + "Duration in milliseconds for which the request was throttled due to quota violation. (Zero if the request did not violate any quota.)", + 0); + } + private static String indentString(int size) { StringBuilder b = new StringBuilder(size); for (int i = 0; i < size; i++) http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 07bde63..04f2602 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -95,7 +95,14 @@ public abstract class AbstractRequest extends AbstractRequestResponse { /** * Get an error response for a request */ - public abstract AbstractResponse getErrorResponse(Throwable e); + public AbstractResponse getErrorResponse(Throwable e) { + return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e); + } + + /** + * Get an error response for a request with specified throttle time in the response if applicable + */ + public abstract AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e); /** * Factory method for getting a request object based on ApiKey ID and a buffer http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 2286783..b76cb21 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public abstract class AbstractResponse extends AbstractRequestResponse { + public static final int DEFAULT_THROTTLE_TIME = 0; public Send toSend(String destination, RequestHeader requestHeader) { return toSend(destination, requestHeader.apiVersion(), requestHeader.toResponseHeader()); http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java index 4245e82..733e806 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnRequest.java @@ -96,8 +96,8 @@ public class AddOffsetsToTxnRequest extends AbstractRequest { } @Override - public AddOffsetsToTxnResponse getErrorResponse(Throwable e) { - return new AddOffsetsToTxnResponse(Errors.forException(e)); + public AddOffsetsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new AddOffsetsToTxnResponse(throttleTimeMs, Errors.forException(e)); } public static AddOffsetsToTxnRequest parse(ByteBuffer buffer, short version) { http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java index 2426514..8c41ae4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public class AddOffsetsToTxnResponse extends AbstractResponse { + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String ERROR_CODE_KEY_NAME = "error_code"; // Possible error codes: @@ -35,15 +36,22 @@ public class AddOffsetsToTxnResponse extends AbstractResponse { // InvalidProducerEpoch private final Errors error; + private final int throttleTimeMs; - public AddOffsetsToTxnResponse(Errors error) { + public AddOffsetsToTxnResponse(int throttleTimeMs, Errors error) { + this.throttleTimeMs = throttleTimeMs; this.error = error; } public AddOffsetsToTxnResponse(Struct struct) { + this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); } + public int throttleTimeMs() { + return throttleTimeMs; + } + public Errors error() { return error; } @@ -51,6 +59,7 @@ public class AddOffsetsToTxnResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.ADD_OFFSETS_TO_TXN.responseSchema(version)); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); struct.set(ERROR_CODE_KEY_NAME, error.code()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java index 9a983d0..5bbea61 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnRequest.java @@ -125,8 +125,8 @@ public class AddPartitionsToTxnRequest extends AbstractRequest { } @Override - public AddPartitionsToTxnResponse getErrorResponse(Throwable e) { - return new AddPartitionsToTxnResponse(Errors.forException(e)); + public AddPartitionsToTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new AddPartitionsToTxnResponse(throttleTimeMs, Errors.forException(e)); } public static AddPartitionsToTxnRequest parse(ByteBuffer buffer, short version) { http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java index 0337044..893fcda 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public class AddPartitionsToTxnResponse extends AbstractResponse { + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String ERROR_CODE_KEY_NAME = "error_code"; // Possible error codes: @@ -35,15 +36,22 @@ public class AddPartitionsToTxnResponse extends AbstractResponse { // InvalidProducerEpoch private final Errors error; + private final int throttleTimeMs; - public AddPartitionsToTxnResponse(Errors error) { + public AddPartitionsToTxnResponse(int throttleTimeMs, Errors error) { + this.throttleTimeMs = throttleTimeMs; this.error = error; } public AddPartitionsToTxnResponse(Struct struct) { + this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); } + public int throttleTimeMs() { + return throttleTimeMs; + } + public Errors error() { return error; } @@ -51,6 +59,7 @@ public class AddPartitionsToTxnResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.ADD_PARTITIONS_TO_TXN.responseSchema(version)); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); struct.set(ERROR_CODE_KEY_NAME, error.code()); return struct; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java index 07dd5f5..6f63040 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsRequest.java @@ -30,6 +30,10 @@ public class ApiVersionsRequest extends AbstractRequest { super(ApiKeys.API_VERSIONS); } + public Builder(short version) { + super(ApiKeys.API_VERSIONS, version); + } + @Override public ApiVersionsRequest build(short version) { return new ApiVersionsRequest(version); @@ -55,11 +59,13 @@ public class ApiVersionsRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(Throwable e) { + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { short versionId = version(); switch (versionId) { case 0: return new ApiVersionsResponse(Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList()); + case 1: + return new ApiVersionsResponse(throttleTimeMs, Errors.forException(e), Collections.<ApiVersionsResponse.ApiVersion>emptyList()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ApiKeys.API_VERSIONS.latestVersion())); http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 382da89..d434c75 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -31,7 +31,8 @@ import java.util.Map; public class ApiVersionsResponse extends AbstractResponse { - public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(); + public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME); + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; public static final String ERROR_CODE_KEY_NAME = "error_code"; public static final String API_VERSIONS_KEY_NAME = "api_versions"; public static final String API_KEY_NAME = "api_key"; @@ -44,6 +45,7 @@ public class ApiVersionsResponse extends AbstractResponse { * UNSUPPORTED_VERSION (33) */ private final Errors error; + private final int throttleTimeMs; private final Map<Short, ApiVersion> apiKeyToApiVersion; public static final class ApiVersion { @@ -72,11 +74,17 @@ public class ApiVersionsResponse extends AbstractResponse { } public ApiVersionsResponse(Errors error, List<ApiVersion> apiVersions) { + this(DEFAULT_THROTTLE_TIME, error, apiVersions); + } + + public ApiVersionsResponse(int throttleTimeMs, Errors error, List<ApiVersion> apiVersions) { + this.throttleTimeMs = throttleTimeMs; this.error = error; this.apiKeyToApiVersion = buildApiKeyToApiVersion(apiVersions); } public ApiVersionsResponse(Struct struct) { + this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME)); List<ApiVersion> tempApiVersions = new ArrayList<>(); for (Object apiVersionsObj : struct.getArray(API_VERSIONS_KEY_NAME)) { @@ -92,6 +100,8 @@ public class ApiVersionsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.API_VERSIONS.responseSchema(version)); + if (struct.hasField(THROTTLE_TIME_KEY_NAME)) + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); struct.set(ERROR_CODE_KEY_NAME, error.code()); List<Struct> apiVersionList = new ArrayList<>(); for (ApiVersion apiVersion : apiKeyToApiVersion.values()) { @@ -105,15 +115,26 @@ public class ApiVersionsResponse extends AbstractResponse { return struct; } + public static ApiVersionsResponse apiVersionsResponse(short version, int throttleTimeMs) { + if (throttleTimeMs == 0 || version == 0) + return API_VERSIONS_RESPONSE; + else + return createApiVersionsResponse(throttleTimeMs); + } + /** * Returns Errors.UNSUPPORTED_VERSION response with version 0 since we don't support the requested version. */ public static Send unsupportedVersionSend(String destination, RequestHeader requestHeader) { - ApiVersionsResponse response = new ApiVersionsResponse(Errors.UNSUPPORTED_VERSION, + ApiVersionsResponse response = new ApiVersionsResponse(DEFAULT_THROTTLE_TIME, Errors.UNSUPPORTED_VERSION, Collections.<ApiVersion>emptyList()); return response.toSend(destination, (short) 0, requestHeader.toResponseHeader()); } + public int throttleTimeMs() { + return throttleTimeMs; + } + public Collection<ApiVersion> apiVersions() { return apiKeyToApiVersion.values(); } @@ -127,15 +148,15 @@ public class ApiVersionsResponse extends AbstractResponse { } public static ApiVersionsResponse parse(ByteBuffer buffer, short version) { - return new ApiVersionsResponse(ApiKeys.API_VERSIONS.responseSchema(version).read(buffer)); + return new ApiVersionsResponse(ApiKeys.API_VERSIONS.parseResponse(version, buffer)); } - private static ApiVersionsResponse createApiVersionsResponse() { + public static ApiVersionsResponse createApiVersionsResponse(int throttleTimeMs) { List<ApiVersion> versionList = new ArrayList<>(); for (ApiKeys apiKey : ApiKeys.values()) { versionList.add(new ApiVersion(apiKey)); } - return new ApiVersionsResponse(Errors.NONE, versionList); + return new ApiVersionsResponse(throttleTimeMs, Errors.NONE, versionList); } private Map<Short, ApiVersion> buildApiKeyToApiVersion(List<ApiVersion> apiVersions) { http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java index 4b5ec13..ee41665 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ControlledShutdownRequest.java @@ -62,7 +62,7 @@ public class ControlledShutdownRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(Throwable e) { + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { short versionId = version(); switch (versionId) { case 0: http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java index 072dde8..a0626cc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java @@ -209,7 +209,7 @@ public class CreateTopicsRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(Throwable e) { + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Map<String, CreateTopicsResponse.Error> topicErrors = new HashMap<>(); for (String topic : topics.keySet()) { Errors error = Errors.forException(e); @@ -223,6 +223,8 @@ public class CreateTopicsRequest extends AbstractRequest { case 0: case 1: return new CreateTopicsResponse(topicErrors); + case 2: + return new CreateTopicsResponse(throttleTimeMs, topicErrors); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ApiKeys.CREATE_TOPICS.latestVersion())); http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java index 33a4b4a..54f9764 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; public class CreateTopicsResponse extends AbstractResponse { + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors"; private static final String TOPIC_KEY_NAME = "topic"; private static final String ERROR_CODE_KEY_NAME = "error_code"; @@ -82,8 +83,14 @@ public class CreateTopicsResponse extends AbstractResponse { */ private final Map<String, Error> errors; + private final int throttleTimeMs; public CreateTopicsResponse(Map<String, Error> errors) { + this(DEFAULT_THROTTLE_TIME, errors); + } + + public CreateTopicsResponse(int throttleTimeMs, Map<String, Error> errors) { + this.throttleTimeMs = throttleTimeMs; this.errors = errors; } @@ -100,12 +107,15 @@ public class CreateTopicsResponse extends AbstractResponse { errors.put(topic, new Error(error, errorMessage)); } + this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; this.errors = errors; } @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.CREATE_TOPICS.responseSchema(version)); + if (struct.hasField(THROTTLE_TIME_KEY_NAME)) + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); List<Struct> topicErrorsStructs = new ArrayList<>(errors.size()); for (Map.Entry<String, Error> topicError : errors.entrySet()) { @@ -121,6 +131,10 @@ public class CreateTopicsResponse extends AbstractResponse { return struct; } + public int throttleTimeMs() { + return throttleTimeMs; + } + public Map<String, Error> errors() { return errors; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java index 96f064c..fcd9836 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsRequest.java @@ -119,7 +119,7 @@ public class DeleteRecordsRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(Throwable e) { + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Map<TopicPartition, DeleteRecordsResponse.PartitionResponse> responseMap = new HashMap<>(); for (Map.Entry<TopicPartition, Long> entry : partitionOffsets.entrySet()) { @@ -129,7 +129,7 @@ public class DeleteRecordsRequest extends AbstractRequest { short versionId = version(); switch (versionId) { case 0: - return new DeleteRecordsResponse(responseMap); + return new DeleteRecordsResponse(throttleTimeMs, responseMap); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ApiKeys.DELETE_RECORDS.latestVersion())); http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java index 45b518b..64165eb 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java @@ -33,6 +33,7 @@ public class DeleteRecordsResponse extends AbstractResponse { public static final long INVALID_LOW_WATERMARK = -1L; // request level key names + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String TOPICS_KEY_NAME = "topics"; // topic level key names @@ -44,6 +45,7 @@ public class DeleteRecordsResponse extends AbstractResponse { private static final String LOW_WATERMARK_KEY_NAME = "low_watermark"; private static final String ERROR_CODE_KEY_NAME = "error_code"; + private final int throttleTimeMs; private final Map<TopicPartition, PartitionResponse> responses; /** @@ -80,6 +82,7 @@ public class DeleteRecordsResponse extends AbstractResponse { } public DeleteRecordsResponse(Struct struct) { + this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; responses = new HashMap<>(); for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) { Struct topicStruct = (Struct) topicStructObj; @@ -97,13 +100,16 @@ public class DeleteRecordsResponse extends AbstractResponse { /** * Constructor for version 0. */ - public DeleteRecordsResponse(Map<TopicPartition, PartitionResponse> responses) { + public DeleteRecordsResponse(int throttleTimeMs, Map<TopicPartition, PartitionResponse> responses) { + this.throttleTimeMs = throttleTimeMs; this.responses = responses; } @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DELETE_RECORDS.responseSchema(version)); + if (struct.hasField(THROTTLE_TIME_KEY_NAME)) + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); Map<String, Map<Integer, PartitionResponse>> responsesByTopic = CollectionUtils.groupDataByTopic(responses); List<Struct> topicStructArray = new ArrayList<>(); for (Map.Entry<String, Map<Integer, PartitionResponse>> responsesByTopicEntry : responsesByTopic.entrySet()) { @@ -125,6 +131,10 @@ public class DeleteRecordsResponse extends AbstractResponse { return struct; } + public int throttleTimeMs() { + return throttleTimeMs; + } + public Map<TopicPartition, PartitionResponse> responses() { return this.responses; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java index ccbe211..2ea8c21 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java @@ -86,7 +86,7 @@ public class DeleteTopicsRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(Throwable e) { + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { Map<String, Errors> topicErrors = new HashMap<>(); for (String topic : topics) topicErrors.put(topic, Errors.forException(e)); @@ -94,6 +94,8 @@ public class DeleteTopicsRequest extends AbstractRequest { switch (version()) { case 0: return new DeleteTopicsResponse(topicErrors); + case 1: + return new DeleteTopicsResponse(throttleTimeMs, topicErrors); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", version(), this.getClass().getSimpleName(), ApiKeys.DELETE_TOPICS.latestVersion())); http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java index 9d0d0f3..1b80d1c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; public class DeleteTopicsResponse extends AbstractResponse { + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes"; private static final String TOPIC_KEY_NAME = "topic"; private static final String ERROR_CODE_KEY_NAME = "error_code"; @@ -40,12 +41,19 @@ public class DeleteTopicsResponse extends AbstractResponse { * NOT_CONTROLLER(41) */ private final Map<String, Errors> errors; + private final int throttleTimeMs; public DeleteTopicsResponse(Map<String, Errors> errors) { + this(DEFAULT_THROTTLE_TIME, errors); + } + + public DeleteTopicsResponse(int throttleTimeMs, Map<String, Errors> errors) { + this.throttleTimeMs = throttleTimeMs; this.errors = errors; } public DeleteTopicsResponse(Struct struct) { + this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; Object[] topicErrorCodesStructs = struct.getArray(TOPIC_ERROR_CODES_KEY_NAME); Map<String, Errors> errors = new HashMap<>(); for (Object topicErrorCodeStructObj : topicErrorCodesStructs) { @@ -61,6 +69,8 @@ public class DeleteTopicsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DELETE_TOPICS.responseSchema(version)); + if (struct.hasField(THROTTLE_TIME_KEY_NAME)) + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size()); for (Map.Entry<String, Errors> topicError : errors.entrySet()) { Struct topicErrorCodeStruct = struct.instance(TOPIC_ERROR_CODES_KEY_NAME); @@ -72,6 +82,10 @@ public class DeleteTopicsResponse extends AbstractResponse { return struct; } + public int throttleTimeMs() { + return throttleTimeMs; + } + public Map<String, Errors> errors() { return errors; } http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java index 287eda9..b43e254 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsRequest.java @@ -73,11 +73,13 @@ public class DescribeGroupsRequest extends AbstractRequest { } @Override - public AbstractResponse getErrorResponse(Throwable e) { + public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { short version = version(); switch (version) { case 0: return DescribeGroupsResponse.fromError(Errors.forException(e), groupIds); + case 1: + return DescribeGroupsResponse.fromError(throttleTimeMs, Errors.forException(e), groupIds); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index 797ed58..bd7a087 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -29,6 +29,7 @@ import java.util.Map; public class DescribeGroupsResponse extends AbstractResponse { + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String GROUPS_KEY_NAME = "groups"; private static final String ERROR_CODE_KEY_NAME = "error_code"; @@ -58,12 +59,19 @@ public class DescribeGroupsResponse extends AbstractResponse { */ private final Map<String, GroupMetadata> groups; + private final int throttleTimeMs; public DescribeGroupsResponse(Map<String, GroupMetadata> groups) { + this(DEFAULT_THROTTLE_TIME, groups); + } + + public DescribeGroupsResponse(int throttleTimeMs, Map<String, GroupMetadata> groups) { + this.throttleTimeMs = throttleTimeMs; this.groups = groups; } public DescribeGroupsResponse(Struct struct) { + this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; this.groups = new HashMap<>(); for (Object groupObj : struct.getArray(GROUPS_KEY_NAME)) { Struct groupStruct = (Struct) groupObj; @@ -89,6 +97,10 @@ public class DescribeGroupsResponse extends AbstractResponse { } } + public int throttleTimeMs() { + return throttleTimeMs; + } + public Map<String, GroupMetadata> groups() { return groups; } @@ -184,16 +196,22 @@ public class DescribeGroupsResponse extends AbstractResponse { } public static DescribeGroupsResponse fromError(Errors error, List<String> groupIds) { + return fromError(DEFAULT_THROTTLE_TIME, error, groupIds); + } + + public static DescribeGroupsResponse fromError(int throttleTimeMs, Errors error, List<String> groupIds) { GroupMetadata errorMetadata = GroupMetadata.forError(error); Map<String, GroupMetadata> groups = new HashMap<>(); for (String groupId : groupIds) groups.put(groupId, errorMetadata); - return new DescribeGroupsResponse(groups); + return new DescribeGroupsResponse(throttleTimeMs, groups); } @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DESCRIBE_GROUPS.responseSchema(version)); + if (struct.hasField(THROTTLE_TIME_KEY_NAME)) + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); List<Struct> groupStructs = new ArrayList<>(); for (Map.Entry<String, GroupMetadata> groupEntry : groups.entrySet()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/0104b657/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java index e6eb54e..9c215be 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java @@ -96,8 +96,8 @@ public class EndTxnRequest extends AbstractRequest { } @Override - public EndTxnResponse getErrorResponse(Throwable e) { - return new EndTxnResponse(Errors.forException(e)); + public EndTxnResponse getErrorResponse(int throttleTimeMs, Throwable e) { + return new EndTxnResponse(throttleTimeMs, Errors.forException(e)); } public static EndTxnRequest parse(ByteBuffer buffer, short version) {