Repository: kafka
Updated Branches:
  refs/heads/trunk 20e200878 -> 73ca0d215


KAFKA-5320: Include all request throttling in client throttle metrics

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #3137 from rajinisivaram/KAFKA-5320


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73ca0d21
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73ca0d21
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73ca0d21

Branch: refs/heads/trunk
Commit: 73ca0d215ead9574487744eb89f7ae677a9e13ea
Parents: 20e2008
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Thu May 25 20:28:18 2017 +0100
Committer: Rajini Sivaram <rajinisiva...@googlemail.com>
Committed: Thu May 25 20:28:18 2017 +0100

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |  2 +-
 .../org/apache/kafka/clients/NetworkClient.java | 40 +++++++++++--
 .../kafka/clients/consumer/KafkaConsumer.java   |  4 +-
 .../clients/consumer/internals/Fetcher.java     | 26 +++++----
 .../kafka/clients/producer/KafkaProducer.java   |  4 +-
 .../clients/producer/internals/Sender.java      | 25 ++++----
 .../kafka/common/requests/AbstractResponse.java |  1 +
 .../requests/AddOffsetsToTxnResponse.java       |  1 -
 .../requests/AddPartitionsToTxnResponse.java    |  1 -
 .../common/requests/AlterConfigsResponse.java   |  2 -
 .../common/requests/ApiVersionsResponse.java    |  1 -
 .../common/requests/CreateAclsResponse.java     |  5 +-
 .../common/requests/CreateTopicsResponse.java   |  1 -
 .../common/requests/DeleteAclsResponse.java     |  5 +-
 .../common/requests/DeleteRecordsResponse.java  |  1 -
 .../common/requests/DeleteTopicsResponse.java   |  1 -
 .../common/requests/DescribeAclsResponse.java   |  5 +-
 .../requests/DescribeConfigsResponse.java       |  2 -
 .../common/requests/DescribeGroupsResponse.java |  1 -
 .../kafka/common/requests/EndTxnResponse.java   |  1 -
 .../kafka/common/requests/FetchResponse.java    |  1 -
 .../requests/FindCoordinatorResponse.java       |  1 -
 .../common/requests/HeartbeatResponse.java      |  1 -
 .../common/requests/InitProducerIdResponse.java |  1 -
 .../common/requests/JoinGroupResponse.java      |  1 -
 .../common/requests/LeaveGroupResponse.java     |  1 -
 .../common/requests/ListGroupsResponse.java     |  1 -
 .../common/requests/ListOffsetResponse.java     |  1 -
 .../kafka/common/requests/MetadataResponse.java |  1 -
 .../common/requests/OffsetCommitResponse.java   |  1 -
 .../common/requests/OffsetFetchResponse.java    |  1 -
 .../kafka/common/requests/ProduceResponse.java  |  1 -
 .../common/requests/SyncGroupResponse.java      |  1 -
 .../requests/TxnOffsetCommitResponse.java       |  1 -
 .../clients/consumer/internals/FetcherTest.java | 60 ++++++++++++++------
 .../clients/producer/internals/SenderTest.java  | 45 ++++++++++++---
 36 files changed, 156 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 3b865bc..a8d3033 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              
files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
+              
files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/>
     <suppress checks="ClassFanOutComplexity"
               files=".*/protocol/Errors.java"/>
     <suppress checks="ClassFanOutComplexity"

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 93fbb85..bfd0eb5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelState;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.network.Selectable;
@@ -104,6 +105,8 @@ public class NetworkClient implements KafkaClient {
 
     private final List<ClientResponse> abortedSends = new LinkedList<>();
 
+    private final Sensor throttleTimeSensor;
+
     public NetworkClient(Selectable selector,
                          Metadata metadata,
                          String clientId,
@@ -119,7 +122,27 @@ public class NetworkClient implements KafkaClient {
         this(null, metadata, selector, clientId, 
maxInFlightRequestsPerConnection,
              reconnectBackoffMs, reconnectBackoffMax,
              socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
-             discoverBrokerVersions, apiVersions);
+             discoverBrokerVersions, apiVersions, null);
+    }
+
+    public NetworkClient(Selectable selector,
+            Metadata metadata,
+            String clientId,
+            int maxInFlightRequestsPerConnection,
+            long reconnectBackoffMs,
+            long reconnectBackoffMax,
+            int socketSendBuffer,
+            int socketReceiveBuffer,
+            int requestTimeoutMs,
+            Time time,
+            boolean discoverBrokerVersions,
+            ApiVersions apiVersions,
+            Sensor throttleTimeSensor) {
+
+        this(null, metadata, selector, clientId, 
maxInFlightRequestsPerConnection,
+             reconnectBackoffMs, reconnectBackoffMax,
+             socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
+             discoverBrokerVersions, apiVersions, throttleTimeSensor);
     }
 
     public NetworkClient(Selectable selector,
@@ -137,7 +160,7 @@ public class NetworkClient implements KafkaClient {
         this(metadataUpdater, null, selector, clientId, 
maxInFlightRequestsPerConnection,
              reconnectBackoffMs, reconnectBackoffMax,
              socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time,
-             discoverBrokerVersions, apiVersions);
+             discoverBrokerVersions, apiVersions, null);
     }
 
     private NetworkClient(MetadataUpdater metadataUpdater,
@@ -152,7 +175,8 @@ public class NetworkClient implements KafkaClient {
                           int requestTimeoutMs,
                           Time time,
                           boolean discoverBrokerVersions,
-                          ApiVersions apiVersions) {
+                          ApiVersions apiVersions,
+                          Sensor throttleTimeSensor) {
         /* It would be better if we could pass `DefaultMetadataUpdater` from 
the public constructor, but it's not
          * possible because `DefaultMetadataUpdater` is an inner class and it 
can only be instantiated after the
          * super constructor is invoked.
@@ -177,6 +201,7 @@ public class NetworkClient implements KafkaClient {
         this.time = time;
         this.discoverBrokerVersions = discoverBrokerVersions;
         this.apiVersions = apiVersions;
+        this.throttleTimeSensor = throttleTimeSensor;
     }
 
     /**
@@ -480,11 +505,18 @@ public class NetworkClient implements KafkaClient {
     }
 
     public static AbstractResponse parseResponse(ByteBuffer responseBuffer, 
RequestHeader requestHeader) {
+        return parseResponseMaybeUpdateThrottleTimeMetrics(responseBuffer, 
requestHeader, null, 0);
+    }
+
+    private static AbstractResponse 
parseResponseMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, 
RequestHeader requestHeader,
+            Sensor throttleTimeSensor, long now) {
         ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer);
         // Always expect the response version id to be the same as the request 
version id
         ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey());
         Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), 
responseBuffer);
         correlate(requestHeader, responseHeader);
+        if (throttleTimeSensor != null && 
responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME))
+            
throttleTimeSensor.record(responseBody.getInt(AbstractResponse.THROTTLE_TIME_KEY_NAME),
 now);
         return AbstractResponse.getResponse(apiKey, responseBody);
     }
 
@@ -572,7 +604,7 @@ public class NetworkClient implements KafkaClient {
         for (NetworkReceive receive : this.selector.completedReceives()) {
             String source = receive.source();
             InFlightRequest req = inFlightRequests.completeNext(source);
-            AbstractResponse body = parseResponse(receive.payload(), 
req.header);
+            AbstractResponse body = 
parseResponseMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, 
throttleTimeSensor, now);
             log.trace("Completed receive from node {}, for key {}, received 
{}", req.destination, req.header.apiKey(), body);
             if (req.isInternalRequest && body instanceof MetadataResponse)
                 metadataUpdater.handleCompletedMetadataResponse(req.header, 
now, (MetadataResponse) body);

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 6489792..0359071 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -666,6 +666,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
             IsolationLevel isolationLevel = IsolationLevel.valueOf(
                     
config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
+            Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, 
metricGrpPrefix);
 
             NetworkClient netClient = new NetworkClient(
                     new 
Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), 
metrics, time, metricGrpPrefix, channelBuilder),
@@ -679,7 +680,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                     time,
                     true,
-                    new ApiVersions());
+                    new ApiVersions(),
+                    throttleTimeSensor);
             this.client = new ConsumerNetworkClient(netClient, metadata, time, 
retryBackoffMs,
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
             OffsetResetStrategy offsetResetStrategy = 
OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index a79ea5d..01bd0e5 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -227,7 +227,6 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
                             }
 
                             
sensors.fetchLatency.record(resp.requestLatencyMs());
-                            
sensors.fetchThrottleTimeSensor.record(response.throttleTimeMs());
                         }
 
                         @Override
@@ -932,6 +931,18 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
         sensors.updatePartitionLagSensors(assignment);
     }
 
+    public static Sensor throttleTimeSensor(Metrics metrics, String 
metricGrpPrefix) {
+        String metricGrpName = metricGrpPrefix + 
FetchManagerMetrics.METRIC_GROUP_SUFFIX;
+        Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
+        
fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg",
+                                                     metricGrpName,
+                                                     "The average throttle 
time in ms"), new Avg());
+        
fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max",
+                                                     metricGrpName,
+                                                     "The maximum throttle 
time in ms"), new Max());
+        return fetchThrottleTimeSensor;
+    }
+
     private class PartitionRecords {
         private final TopicPartition partition;
         private final CompletedFetch completedFetch;
@@ -1214,19 +1225,19 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
     }
 
     private static class FetchManagerMetrics {
+        private static final String METRIC_GROUP_SUFFIX = 
"-fetch-manager-metrics";
         private final Metrics metrics;
         private final String metricGrpName;
         private final Sensor bytesFetched;
         private final Sensor recordsFetched;
         private final Sensor fetchLatency;
         private final Sensor recordsFetchLag;
-        private final Sensor fetchThrottleTimeSensor;
 
         private Set<TopicPartition> assignedPartitions;
 
         private FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) {
             this.metrics = metrics;
-            this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics";
+            this.metricGrpName = metricGrpPrefix + METRIC_GROUP_SUFFIX;
 
             this.bytesFetched = metrics.sensor("bytes-fetched");
             this.bytesFetched.add(metrics.metricName("fetch-size-avg",
@@ -1262,15 +1273,6 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
             this.recordsFetchLag.add(metrics.metricName("records-lag-max",
                 this.metricGrpName,
                 "The maximum lag in terms of number of records for any 
partition in this window"), new Max());
-
-            this.fetchThrottleTimeSensor = 
metrics.sensor("fetch-throttle-time");
-            
this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg",
-                                                         this.metricGrpName,
-                                                         "The average throttle 
time in ms"), new Avg());
-
-            
this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max",
-                                                         this.metricGrpName,
-                                                         "The maximum throttle 
time in ms"), new Max());
         }
 
         private void recordTopicFetchMetrics(String topic, int bytes, int 
records) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 22baf3c..4fcbcc8 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -290,6 +290,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 
Collections.<String>emptySet(), time.milliseconds());
             ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config);
+            Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);
             NetworkClient client = new NetworkClient(
                     new 
Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                             this.metrics, time, "producer", channelBuilder),
@@ -303,7 +304,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     this.requestTimeoutMs,
                     time,
                     true,
-                    apiVersions);
+                    apiVersions,
+                    throttleTimeSensor);
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 116a1c5..3fa5903 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -463,7 +463,6 @@ public class Sender implements Runnable {
                     completeBatch(batch, partResp, correlationId, now);
                 }
                 this.sensors.recordLatency(response.destination(), 
response.requestLatencyMs());
-                
this.sensors.recordThrottleTime(produceResponse.getThrottleTime());
             } else {
                 // this is the acks = 0 case, just complete all requests
                 for (ProducerBatch batch : batches.values()) {
@@ -661,11 +660,22 @@ public class Sender implements Runnable {
         this.client.wakeup();
     }
 
+    public static Sensor throttleTimeSensor(Metrics metrics) {
+        String metricGrpName = SenderMetrics.METRIC_GROUP_NAME;
+        Sensor produceThrottleTimeSensor = 
metrics.sensor("produce-throttle-time");
+        
produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-avg",
+                metricGrpName, "The average throttle time in ms"), new Avg());
+        
produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-max",
+                metricGrpName, "The maximum throttle time in ms"), new Max());
+        return produceThrottleTimeSensor;
+    }
+
     /**
      * A collection of sensors for the sender
      */
     private class SenderMetrics {
 
+        private static final String METRIC_GROUP_NAME = "producer-metrics";
         private final Metrics metrics;
         public final Sensor retrySensor;
         public final Sensor errorSensor;
@@ -675,12 +685,11 @@ public class Sender implements Runnable {
         public final Sensor batchSizeSensor;
         public final Sensor compressionRateSensor;
         public final Sensor maxRecordSizeSensor;
-        public final Sensor produceThrottleTimeSensor;
         public final Sensor batchSplitSensor;
 
         public SenderMetrics(Metrics metrics) {
             this.metrics = metrics;
-            String metricGrpName = "producer-metrics";
+            String metricGrpName = METRIC_GROUP_NAME;
 
             this.batchSizeSensor = metrics.sensor("batch-size");
             MetricName m = metrics.metricName("batch-size-avg", metricGrpName, 
"The average number of bytes sent per partition per-request.");
@@ -704,12 +713,6 @@ public class Sender implements Runnable {
             m = metrics.metricName("request-latency-max", metricGrpName, "The 
maximum request latency in ms");
             this.requestTimeSensor.add(m, new Max());
 
-            this.produceThrottleTimeSensor = 
metrics.sensor("produce-throttle-time");
-            m = metrics.metricName("produce-throttle-time-avg", metricGrpName, 
"The average throttle time in ms");
-            this.produceThrottleTimeSensor.add(m, new Avg());
-            m = metrics.metricName("produce-throttle-time-max", metricGrpName, 
"The maximum throttle time in ms");
-            this.produceThrottleTimeSensor.add(m, new Max());
-
             this.recordsPerRequestSensor = 
metrics.sensor("records-per-request");
             m = metrics.metricName("record-send-rate", metricGrpName, "The 
average number of records sent per second.");
             this.recordsPerRequestSensor.add(m, new Rate());
@@ -847,10 +850,6 @@ public class Sender implements Runnable {
             }
         }
 
-        public void recordThrottleTime(long throttleTimeMs) {
-            this.produceThrottleTimeSensor.record(throttleTimeMs, 
time.milliseconds());
-        }
-
         void recordBatchSplit() {
             this.batchSplitSensor.record();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 1000ef5..99b35e8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public abstract class AbstractResponse extends AbstractRequestResponse {
+    public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final int DEFAULT_THROTTLE_TIME = 0;
 
     public Send toSend(String destination, RequestHeader requestHeader) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
index 754f242..0536636 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class AddOffsetsToTxnResponse extends AbstractResponse {
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // Possible error codes:

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
index 39172ee..4112b93 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java
@@ -29,7 +29,6 @@ import java.util.List;
 import java.util.Map;
 
 public class AddPartitionsToTxnResponse extends AbstractResponse {
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String ERRORS_KEY_NAME = "errors";
     private static final String TOPIC_NAME = "topic";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
index 8f904d8..3a3eb9a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java
@@ -28,8 +28,6 @@ import java.util.Map;
 
 public class AlterConfigsResponse extends AbstractResponse {
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
-
     private static final String RESOURCES_KEY_NAME = "resources";
     private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
     private static final String RESOURCE_NAME_KEY_NAME = "resource_name";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index d434c75..6f921a7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -32,7 +32,6 @@ import java.util.Map;
 public class ApiVersionsResponse extends AbstractResponse {
 
     public static final ApiVersionsResponse API_VERSIONS_RESPONSE = 
createApiVersionsResponse(DEFAULT_THROTTLE_TIME);
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String API_VERSIONS_KEY_NAME = "api_versions";
     public static final String API_KEY_NAME = "api_key";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index 885981a..c84b97c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class CreateAclsResponse extends AbstractResponse {
-    private final static String THROTTLE_TIME_MS = "throttle_time_ms";
     private final static String CREATION_RESPONSES = "creation_responses";
     private final static String ERROR_CODE = "error_code";
     private final static String ERROR_MESSAGE = "error_message";
@@ -57,7 +56,7 @@ public class CreateAclsResponse extends AbstractResponse {
     }
 
     public CreateAclsResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS);
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         this.aclCreationResponses = new ArrayList<>();
         for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) {
             Struct responseStruct = (Struct) responseStructObj;
@@ -75,7 +74,7 @@ public class CreateAclsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.CREATE_ACLS.responseSchema(version));
-        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         List<Struct> responseStructs = new ArrayList<>();
         for (AclCreationResponse response : aclCreationResponses) {
             Struct responseStruct = struct.instance(CREATION_RESPONSES);

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index e46e7a1..c34265d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 
 public class CreateTopicsResponse extends AbstractResponse {
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
     private static final String TOPIC_KEY_NAME = "topic";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 6fffc0f..341021b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -35,7 +35,6 @@ import java.util.List;
 
 public class DeleteAclsResponse extends AbstractResponse {
     public static final Logger log = 
LoggerFactory.getLogger(DeleteAclsResponse.class);
-    private final static String THROTTLE_TIME_MS = "throttle_time_ms";
     private final static String FILTER_RESPONSES = "filter_responses";
     private final static String ERROR_CODE = "error_code";
     private final static String ERROR_MESSAGE = "error_message";
@@ -97,7 +96,7 @@ public class DeleteAclsResponse extends AbstractResponse {
     }
 
     public DeleteAclsResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS);
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         this.responses = new ArrayList<>();
         for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) {
             Struct responseStruct = (Struct) responseStructObj;
@@ -130,7 +129,7 @@ public class DeleteAclsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.DELETE_ACLS.responseSchema(version));
-        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         List<Struct> responseStructs = new ArrayList<>();
         for (AclFilterResponse response : responses) {
             Struct responseStruct = struct.instance(FILTER_RESPONSES);

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index 64165eb..f19f933 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -33,7 +33,6 @@ public class DeleteRecordsResponse extends AbstractResponse {
     public static final long INVALID_LOW_WATERMARK = -1L;
 
     // request level key names
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPICS_KEY_NAME = "topics";
 
     // topic level key names

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
index 1b80d1c..3f11167 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 
 public class DeleteTopicsResponse extends AbstractResponse {
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPIC_ERROR_CODES_KEY_NAME = 
"topic_error_codes";
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String ERROR_CODE_KEY_NAME = "error_code";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index 0de4865..127493b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -33,7 +33,6 @@ import java.util.List;
 import java.util.Map;
 
 public class DescribeAclsResponse extends AbstractResponse {
-    private final static String THROTTLE_TIME_MS = "throttle_time_ms";
     private final static String ERROR_CODE = "error_code";
     private final static String ERROR_MESSAGE = "error_message";
     private final static String RESOURCES = "resources";
@@ -50,7 +49,7 @@ public class DescribeAclsResponse extends AbstractResponse {
     }
 
     public DescribeAclsResponse(Struct struct) {
-        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS);
+        this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
         Errors error = Errors.forCode(struct.getShort(ERROR_CODE));
         if (error != Errors.NONE) {
             this.throwable = error.exception(struct.getString(ERROR_MESSAGE));
@@ -73,7 +72,7 @@ public class DescribeAclsResponse extends AbstractResponse {
     @Override
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version));
-        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
         if (throwable != null) {
             Errors errors = Errors.forException(throwable);
             struct.set(ERROR_CODE, errors.code());

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 05bf88d..8694e1f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -29,8 +29,6 @@ import java.util.Map;
 
 public class DescribeConfigsResponse extends AbstractResponse {
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
-
     private static final String RESOURCES_KEY_NAME = "resources";
 
     private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
index bd7a087..0e1d6bd 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java
@@ -29,7 +29,6 @@ import java.util.Map;
 
 public class DescribeGroupsResponse extends AbstractResponse {
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String GROUPS_KEY_NAME = "groups";
 
     private static final String ERROR_CODE_KEY_NAME = "error_code";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
index 17cf68d..9083808 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 
 public class EndTxnResponse extends AbstractResponse {
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     // Possible error codes:

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 0cb87b5..96fee43 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -43,7 +43,6 @@ public class FetchResponse extends AbstractResponse {
     // topic level field names
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITIONS_KEY_NAME = "partition_responses";
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
 
     // partition level field names
     private static final String PARTITION_HEADER_KEY_NAME = "partition_header";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
index e7df8e8..11eed1d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java
@@ -25,7 +25,6 @@ import java.nio.ByteBuffer;
 
 public class FindCoordinatorResponse extends AbstractResponse {
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
     private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
     private static final String COORDINATOR_KEY_NAME = "coordinator";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
index a90212b..cec39f0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 
 public class HeartbeatResponse extends AbstractResponse {
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
index 96e1cdf..da5e6e5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java
@@ -31,7 +31,6 @@ public class InitProducerIdResponse extends AbstractResponse {
     //   TransactionalIdAuthorizationFailed
     //   ClusterAuthorizationFailed
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String PRODUCER_ID_KEY_NAME = "producer_id";
     private static final String EPOCH_KEY_NAME = "producer_epoch";
     private static final String ERROR_CODE_KEY_NAME = "error_code";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
index a1c9e2b..eb86ce7 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java
@@ -28,7 +28,6 @@ import java.util.Map;
 
 public class JoinGroupResponse extends AbstractResponse {
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
index ccfc8a7..1c85850 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 
 public class LeaveGroupResponse extends AbstractResponse {
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
index 13f589f..8ae3792 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java
@@ -27,7 +27,6 @@ import java.util.List;
 
 public class ListGroupsResponse extends AbstractResponse {
 
-    public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String GROUPS_KEY_NAME = "groups";
     public static final String GROUP_ID_KEY_NAME = "group_id";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 61c2a55..7dfaedc 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -33,7 +33,6 @@ public class ListOffsetResponse extends AbstractResponse {
     public static final long UNKNOWN_TIMESTAMP = -1L;
     public static final long UNKNOWN_OFFSET = -1L;
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level field names

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index 017fdf4..74e058b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -35,7 +35,6 @@ import java.util.Set;
 
 public class MetadataResponse extends AbstractResponse {
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String BROKERS_KEY_NAME = "brokers";
     private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
index d8d647d..06e5608 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
@@ -30,7 +30,6 @@ import java.util.Map;
 
 public class OffsetCommitResponse extends AbstractResponse {
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String RESPONSES_KEY_NAME = "responses";
 
     // topic level fields

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
index f795a5b..6315535 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
@@ -32,7 +32,6 @@ import org.apache.kafka.common.utils.CollectionUtils;
 
 public class OffsetFetchResponse extends AbstractResponse {
 
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String RESPONSES_KEY_NAME = "responses";
     private static final String ERROR_CODE_KEY_NAME = "error_code";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 55332f6..d42f1c6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -39,7 +39,6 @@ public class ProduceResponse extends AbstractResponse {
     // topic level field names
     private static final String TOPIC_KEY_NAME = "topic";
     private static final String PARTITION_RESPONSES_KEY_NAME = 
"partition_responses";
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
 
     // partition level field names
     private static final String PARTITION_KEY_NAME = "partition";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
index b99a99f..c96e21f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java
@@ -24,7 +24,6 @@ import java.nio.ByteBuffer;
 
 public class SyncGroupResponse extends AbstractResponse {
 
-    public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     public static final String ERROR_CODE_KEY_NAME = "error_code";
     public static final String MEMBER_ASSIGNMENT_KEY_NAME = 
"member_assignment";
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
index a62568f..e7b349c 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java
@@ -27,7 +27,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class TxnOffsetCommitResponse extends AbstractResponse {
-    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPIC_PARTITIONS_KEY_NAME = "topics";
     private static final String PARTITIONS_KEY_NAME = "partitions";
     private static final String TOPIC_KEY_NAME = "topic";

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 4e46d57..a81dc58 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -16,8 +16,11 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -40,6 +43,8 @@ import 
org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionType;
@@ -64,10 +69,14 @@ import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.ResponseHeader;
+import org.apache.kafka.common.requests.FetchRequest.PartitionData;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -1033,28 +1042,43 @@ public class FetcherTest {
      */
     @Test
     public void testQuotaMetrics() throws Exception {
-        subscriptions.assignFromUser(singleton(tp1));
-        subscriptions.seek(tp1, 0);
-
-        // normal fetch
-        for (int i = 1; i < 4; i++) {
-            // We need to make sure the message offset grows. Otherwise they 
will be considered as already consumed
-            // and filtered out by consumer.
-            MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE,
-                    TimestampType.CREATE_TIME, 0L);
-            for (int v = 0; v < 3; v++)
-                builder.appendWithOffset(i * 3 + v, RecordBatch.NO_TIMESTAMP, 
"key".getBytes(), ("value-" + v).getBytes());
-            List<ConsumerRecord<byte[], byte[]>> records = 
fetchRecords(builder.build(), Errors.NONE, 100L, 100 * i).get(tp1);
-            assertEquals(3, records.size());
+        MockSelector selector = new MockSelector(time);
+        Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics,  
"consumer" + groupId);
+        Cluster cluster = TestUtils.singletonCluster("test", 1);
+        Node node = cluster.nodes().get(0);
+        NetworkClient client = new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE,
+                1000, 1000, 64 * 1024, 64 * 1024, 1000,
+                time, true, new ApiVersions(), throttleTimeSensor);
+
+        short apiVersionsResponseVersion = 
ApiKeys.API_VERSIONS.latestVersion();
+        ByteBuffer buffer = 
ApiVersionsResponse.createApiVersionsResponse(400).serialize(apiVersionsResponseVersion,
 new ResponseHeader(0));
+        selector.delayedReceive(new DelayedReceive(node.idString(), new 
NetworkReceive(node.idString(), buffer)));
+        while (!client.ready(node, time.milliseconds()))
+            client.poll(1, time.milliseconds());
+        selector.clear();
+
+        for (int i = 1; i <= 3; i++) {
+            int throttleTimeMs = 100 * i;
+            FetchRequest.Builder builder = 
FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap<TopicPartition, 
PartitionData>());
+            ClientRequest request = client.newClientRequest(node.idString(), 
builder, time.milliseconds(), true, null);
+            client.send(request, time.milliseconds());
+            client.poll(1, time.milliseconds());
+            FetchResponse response = fetchResponse(nextRecords, Errors.NONE, 
i, throttleTimeMs);
+            buffer = response.serialize(ApiKeys.FETCH.latestVersion(), new 
ResponseHeader(request.correlationId()));
+            selector.completeReceive(new NetworkReceive(node.idString(), 
buffer));
+            client.poll(1, time.milliseconds());
+            selector.clear();
         }
-
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
-        KafkaMetric avgMetric = 
allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup));
-        KafkaMetric maxMetric = 
allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup));
-        assertEquals(200, avgMetric.value(), EPSILON);
-        assertEquals(300, maxMetric.value(), EPSILON);
+        KafkaMetric avgMetric = 
allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, ""));
+        KafkaMetric maxMetric = 
allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, ""));
+        // Throttle times are ApiVersions=400, Fetch=(100, 200, 300)
+        assertEquals(250, avgMetric.value(), EPSILON);
+        assertEquals(400, maxMetric.value(), EPSILON);
+        client.close();
     }
 
+
     /*
      * Send multiple requests. Verify that the client side quota metrics have 
the right values
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 719efe9..50c4cd4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Cluster;
@@ -30,6 +32,8 @@ import 
org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionRatioEstimator;
@@ -43,12 +47,16 @@ import 
org.apache.kafka.common.requests.InitProducerIdRequest;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.ProduceResponse;
+import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.DelayedReceive;
+import org.apache.kafka.test.MockSelector;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -212,18 +220,41 @@ public class SenderTest {
      */
     @Test
     public void testQuotaMetrics() throws Exception {
-        final long offset = 0;
+        MockSelector selector = new MockSelector(time);
+        Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics);
+        Cluster cluster = TestUtils.singletonCluster("test", 1);
+        Node node = cluster.nodes().get(0);
+        NetworkClient client = new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE,
+                1000, 1000, 64 * 1024, 64 * 1024, 1000,
+                time, true, new ApiVersions(), throttleTimeSensor);
+
+        short apiVersionsResponseVersion = 
ApiKeys.API_VERSIONS.latestVersion();
+        ByteBuffer buffer = 
ApiVersionsResponse.createApiVersionsResponse(400).serialize(apiVersionsResponseVersion,
 new ResponseHeader(0));
+        selector.delayedReceive(new DelayedReceive(node.idString(), new 
NetworkReceive(node.idString(), buffer)));
+        while (!client.ready(node, time.milliseconds()))
+            client.poll(1, time.milliseconds());
+        selector.clear();
+
         for (int i = 1; i <= 3; i++) {
-            accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), 
null, null, MAX_BLOCK_TIMEOUT);
-            sender.run(time.milliseconds()); // send produce request
-            client.respond(produceResponse(tp0, offset, Errors.NONE, 100 * i));
-            sender.run(time.milliseconds());
+            int throttleTimeMs = 100 * i;
+            ProduceRequest.Builder builder = new 
ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000,
+                            Collections.<TopicPartition, 
MemoryRecords>emptyMap());
+            ClientRequest request = client.newClientRequest(node.idString(), 
builder, time.milliseconds(), true, null);
+            client.send(request, time.milliseconds());
+            client.poll(1, time.milliseconds());
+            ProduceResponse response = produceResponse(tp0, i, Errors.NONE, 
throttleTimeMs);
+            buffer = response.serialize(ApiKeys.PRODUCE.latestVersion(), new 
ResponseHeader(request.correlationId()));
+            selector.completeReceive(new NetworkReceive(node.idString(), 
buffer));
+            client.poll(1, time.milliseconds());
+            selector.clear();
         }
         Map<MetricName, KafkaMetric> allMetrics = metrics.metrics();
         KafkaMetric avgMetric = 
allMetrics.get(metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, 
""));
         KafkaMetric maxMetric = 
allMetrics.get(metrics.metricName("produce-throttle-time-max", METRIC_GROUP, 
""));
-        assertEquals(200, avgMetric.value(), EPS);
-        assertEquals(300, maxMetric.value(), EPS);
+        // Throttle times are ApiVersions=400, Produce=(100, 200, 300)
+        assertEquals(250, avgMetric.value(), EPS);
+        assertEquals(400, maxMetric.value(), EPS);
+        client.close();
     }
 
     @Test

Reply via email to