Repository: kafka Updated Branches: refs/heads/trunk 20e200878 -> 73ca0d215
KAFKA-5320: Include all request throttling in client throttle metrics Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk> Closes #3137 from rajinisivaram/KAFKA-5320 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/73ca0d21 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/73ca0d21 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/73ca0d21 Branch: refs/heads/trunk Commit: 73ca0d215ead9574487744eb89f7ae677a9e13ea Parents: 20e2008 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Thu May 25 20:28:18 2017 +0100 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Thu May 25 20:28:18 2017 +0100 ---------------------------------------------------------------------- checkstyle/suppressions.xml | 2 +- .../org/apache/kafka/clients/NetworkClient.java | 40 +++++++++++-- .../kafka/clients/consumer/KafkaConsumer.java | 4 +- .../clients/consumer/internals/Fetcher.java | 26 +++++---- .../kafka/clients/producer/KafkaProducer.java | 4 +- .../clients/producer/internals/Sender.java | 25 ++++---- .../kafka/common/requests/AbstractResponse.java | 1 + .../requests/AddOffsetsToTxnResponse.java | 1 - .../requests/AddPartitionsToTxnResponse.java | 1 - .../common/requests/AlterConfigsResponse.java | 2 - .../common/requests/ApiVersionsResponse.java | 1 - .../common/requests/CreateAclsResponse.java | 5 +- .../common/requests/CreateTopicsResponse.java | 1 - .../common/requests/DeleteAclsResponse.java | 5 +- .../common/requests/DeleteRecordsResponse.java | 1 - .../common/requests/DeleteTopicsResponse.java | 1 - .../common/requests/DescribeAclsResponse.java | 5 +- .../requests/DescribeConfigsResponse.java | 2 - .../common/requests/DescribeGroupsResponse.java | 1 - .../kafka/common/requests/EndTxnResponse.java | 1 - .../kafka/common/requests/FetchResponse.java | 1 - .../requests/FindCoordinatorResponse.java | 1 - .../common/requests/HeartbeatResponse.java | 1 - .../common/requests/InitProducerIdResponse.java | 1 - .../common/requests/JoinGroupResponse.java | 1 - .../common/requests/LeaveGroupResponse.java | 1 - .../common/requests/ListGroupsResponse.java | 1 - .../common/requests/ListOffsetResponse.java | 1 - .../kafka/common/requests/MetadataResponse.java | 1 - .../common/requests/OffsetCommitResponse.java | 1 - .../common/requests/OffsetFetchResponse.java | 1 - .../kafka/common/requests/ProduceResponse.java | 1 - .../common/requests/SyncGroupResponse.java | 1 - .../requests/TxnOffsetCommitResponse.java | 1 - .../clients/consumer/internals/FetcherTest.java | 60 ++++++++++++++------ .../clients/producer/internals/SenderTest.java | 45 ++++++++++++--- 36 files changed, 156 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 3b865bc..a8d3033 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -8,7 +8,7 @@ <!-- Clients --> <suppress checks="ClassFanOutComplexity" - files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/> + files="(Fetcher|Sender|SenderTest|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest|KafkaAdminClient|NetworkClient).java"/> <suppress checks="ClassFanOutComplexity" files=".*/protocol/Errors.java"/> <suppress checks="ClassFanOutComplexity" http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 93fbb85..bfd0eb5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -19,6 +19,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.errors.UnsupportedVersionException; +import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.network.ChannelState; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.network.Selectable; @@ -104,6 +105,8 @@ public class NetworkClient implements KafkaClient { private final List<ClientResponse> abortedSends = new LinkedList<>(); + private final Sensor throttleTimeSensor; + public NetworkClient(Selectable selector, Metadata metadata, String clientId, @@ -119,7 +122,27 @@ public class NetworkClient implements KafkaClient { this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, - discoverBrokerVersions, apiVersions); + discoverBrokerVersions, apiVersions, null); + } + + public NetworkClient(Selectable selector, + Metadata metadata, + String clientId, + int maxInFlightRequestsPerConnection, + long reconnectBackoffMs, + long reconnectBackoffMax, + int socketSendBuffer, + int socketReceiveBuffer, + int requestTimeoutMs, + Time time, + boolean discoverBrokerVersions, + ApiVersions apiVersions, + Sensor throttleTimeSensor) { + + this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, + reconnectBackoffMs, reconnectBackoffMax, + socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, + discoverBrokerVersions, apiVersions, throttleTimeSensor); } public NetworkClient(Selectable selector, @@ -137,7 +160,7 @@ public class NetworkClient implements KafkaClient { this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, reconnectBackoffMax, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time, - discoverBrokerVersions, apiVersions); + discoverBrokerVersions, apiVersions, null); } private NetworkClient(MetadataUpdater metadataUpdater, @@ -152,7 +175,8 @@ public class NetworkClient implements KafkaClient { int requestTimeoutMs, Time time, boolean discoverBrokerVersions, - ApiVersions apiVersions) { + ApiVersions apiVersions, + Sensor throttleTimeSensor) { /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the * super constructor is invoked. @@ -177,6 +201,7 @@ public class NetworkClient implements KafkaClient { this.time = time; this.discoverBrokerVersions = discoverBrokerVersions; this.apiVersions = apiVersions; + this.throttleTimeSensor = throttleTimeSensor; } /** @@ -480,11 +505,18 @@ public class NetworkClient implements KafkaClient { } public static AbstractResponse parseResponse(ByteBuffer responseBuffer, RequestHeader requestHeader) { + return parseResponseMaybeUpdateThrottleTimeMetrics(responseBuffer, requestHeader, null, 0); + } + + private static AbstractResponse parseResponseMaybeUpdateThrottleTimeMetrics(ByteBuffer responseBuffer, RequestHeader requestHeader, + Sensor throttleTimeSensor, long now) { ResponseHeader responseHeader = ResponseHeader.parse(responseBuffer); // Always expect the response version id to be the same as the request version id ApiKeys apiKey = ApiKeys.forId(requestHeader.apiKey()); Struct responseBody = apiKey.parseResponse(requestHeader.apiVersion(), responseBuffer); correlate(requestHeader, responseHeader); + if (throttleTimeSensor != null && responseBody.hasField(AbstractResponse.THROTTLE_TIME_KEY_NAME)) + throttleTimeSensor.record(responseBody.getInt(AbstractResponse.THROTTLE_TIME_KEY_NAME), now); return AbstractResponse.getResponse(apiKey, responseBody); } @@ -572,7 +604,7 @@ public class NetworkClient implements KafkaClient { for (NetworkReceive receive : this.selector.completedReceives()) { String source = receive.source(); InFlightRequest req = inFlightRequests.completeNext(source); - AbstractResponse body = parseResponse(receive.payload(), req.header); + AbstractResponse body = parseResponseMaybeUpdateThrottleTimeMetrics(receive.payload(), req.header, throttleTimeSensor, now); log.trace("Completed receive from node {}, for key {}, received {}", req.destination, req.header.apiKey(), body); if (req.isInternalRequest && body instanceof MetadataResponse) metadataUpdater.handleCompletedMetadataResponse(req.header, now, (MetadataResponse) body); http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 6489792..0359071 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -666,6 +666,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { IsolationLevel isolationLevel = IsolationLevel.valueOf( config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT)); + Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricGrpPrefix); NetworkClient netClient = new NetworkClient( new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder), @@ -679,7 +680,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time, true, - new ApiVersions()); + new ApiVersions(), + throttleTimeSensor); this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs, config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT)); http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index a79ea5d..01bd0e5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -227,7 +227,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { } sensors.fetchLatency.record(resp.requestLatencyMs()); - sensors.fetchThrottleTimeSensor.record(response.throttleTimeMs()); } @Override @@ -932,6 +931,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { sensors.updatePartitionLagSensors(assignment); } + public static Sensor throttleTimeSensor(Metrics metrics, String metricGrpPrefix) { + String metricGrpName = metricGrpPrefix + FetchManagerMetrics.METRIC_GROUP_SUFFIX; + Sensor fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time"); + fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg", + metricGrpName, + "The average throttle time in ms"), new Avg()); + fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max", + metricGrpName, + "The maximum throttle time in ms"), new Max()); + return fetchThrottleTimeSensor; + } + private class PartitionRecords { private final TopicPartition partition; private final CompletedFetch completedFetch; @@ -1214,19 +1225,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { } private static class FetchManagerMetrics { + private static final String METRIC_GROUP_SUFFIX = "-fetch-manager-metrics"; private final Metrics metrics; private final String metricGrpName; private final Sensor bytesFetched; private final Sensor recordsFetched; private final Sensor fetchLatency; private final Sensor recordsFetchLag; - private final Sensor fetchThrottleTimeSensor; private Set<TopicPartition> assignedPartitions; private FetchManagerMetrics(Metrics metrics, String metricGrpPrefix) { this.metrics = metrics; - this.metricGrpName = metricGrpPrefix + "-fetch-manager-metrics"; + this.metricGrpName = metricGrpPrefix + METRIC_GROUP_SUFFIX; this.bytesFetched = metrics.sensor("bytes-fetched"); this.bytesFetched.add(metrics.metricName("fetch-size-avg", @@ -1262,15 +1273,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { this.recordsFetchLag.add(metrics.metricName("records-lag-max", this.metricGrpName, "The maximum lag in terms of number of records for any partition in this window"), new Max()); - - this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time"); - this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg", - this.metricGrpName, - "The average throttle time in ms"), new Avg()); - - this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max", - this.metricGrpName, - "The maximum throttle time in ms"), new Max()); } private void recordTopicFetchMetrics(String topic, int bytes, int records) { http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 22baf3c..4fcbcc8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -290,6 +290,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); + Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics); NetworkClient client = new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), @@ -303,7 +304,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.requestTimeoutMs, time, true, - apiVersions); + apiVersions, + throttleTimeSensor); this.sender = new Sender(client, this.metadata, this.accumulator, http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 116a1c5..3fa5903 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -463,7 +463,6 @@ public class Sender implements Runnable { completeBatch(batch, partResp, correlationId, now); } this.sensors.recordLatency(response.destination(), response.requestLatencyMs()); - this.sensors.recordThrottleTime(produceResponse.getThrottleTime()); } else { // this is the acks = 0 case, just complete all requests for (ProducerBatch batch : batches.values()) { @@ -661,11 +660,22 @@ public class Sender implements Runnable { this.client.wakeup(); } + public static Sensor throttleTimeSensor(Metrics metrics) { + String metricGrpName = SenderMetrics.METRIC_GROUP_NAME; + Sensor produceThrottleTimeSensor = metrics.sensor("produce-throttle-time"); + produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-avg", + metricGrpName, "The average throttle time in ms"), new Avg()); + produceThrottleTimeSensor.add(metrics.metricName("produce-throttle-time-max", + metricGrpName, "The maximum throttle time in ms"), new Max()); + return produceThrottleTimeSensor; + } + /** * A collection of sensors for the sender */ private class SenderMetrics { + private static final String METRIC_GROUP_NAME = "producer-metrics"; private final Metrics metrics; public final Sensor retrySensor; public final Sensor errorSensor; @@ -675,12 +685,11 @@ public class Sender implements Runnable { public final Sensor batchSizeSensor; public final Sensor compressionRateSensor; public final Sensor maxRecordSizeSensor; - public final Sensor produceThrottleTimeSensor; public final Sensor batchSplitSensor; public SenderMetrics(Metrics metrics) { this.metrics = metrics; - String metricGrpName = "producer-metrics"; + String metricGrpName = METRIC_GROUP_NAME; this.batchSizeSensor = metrics.sensor("batch-size"); MetricName m = metrics.metricName("batch-size-avg", metricGrpName, "The average number of bytes sent per partition per-request."); @@ -704,12 +713,6 @@ public class Sender implements Runnable { m = metrics.metricName("request-latency-max", metricGrpName, "The maximum request latency in ms"); this.requestTimeSensor.add(m, new Max()); - this.produceThrottleTimeSensor = metrics.sensor("produce-throttle-time"); - m = metrics.metricName("produce-throttle-time-avg", metricGrpName, "The average throttle time in ms"); - this.produceThrottleTimeSensor.add(m, new Avg()); - m = metrics.metricName("produce-throttle-time-max", metricGrpName, "The maximum throttle time in ms"); - this.produceThrottleTimeSensor.add(m, new Max()); - this.recordsPerRequestSensor = metrics.sensor("records-per-request"); m = metrics.metricName("record-send-rate", metricGrpName, "The average number of records sent per second."); this.recordsPerRequestSensor.add(m, new Rate()); @@ -847,10 +850,6 @@ public class Sender implements Runnable { } } - public void recordThrottleTime(long throttleTimeMs) { - this.produceThrottleTimeSensor.record(throttleTimeMs, time.milliseconds()); - } - void recordBatchSplit() { this.batchSplitSensor.record(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index 1000ef5..99b35e8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -24,6 +24,7 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public abstract class AbstractResponse extends AbstractRequestResponse { + public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; public static final int DEFAULT_THROTTLE_TIME = 0; public Send toSend(String destination, RequestHeader requestHeader) { http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java index 754f242..0536636 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddOffsetsToTxnResponse.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public class AddOffsetsToTxnResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String ERROR_CODE_KEY_NAME = "error_code"; // Possible error codes: http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java index 39172ee..4112b93 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AddPartitionsToTxnResponse.java @@ -29,7 +29,6 @@ import java.util.List; import java.util.Map; public class AddPartitionsToTxnResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String ERRORS_KEY_NAME = "errors"; private static final String TOPIC_NAME = "topic"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java index 8f904d8..3a3eb9a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsResponse.java @@ -28,8 +28,6 @@ import java.util.Map; public class AlterConfigsResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; - private static final String RESOURCES_KEY_NAME = "resources"; private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index d434c75..6f921a7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -32,7 +32,6 @@ import java.util.Map; public class ApiVersionsResponse extends AbstractResponse { public static final ApiVersionsResponse API_VERSIONS_RESPONSE = createApiVersionsResponse(DEFAULT_THROTTLE_TIME); - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; public static final String ERROR_CODE_KEY_NAME = "error_code"; public static final String API_VERSIONS_KEY_NAME = "api_versions"; public static final String API_KEY_NAME = "api_key"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java index 885981a..c84b97c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java @@ -25,7 +25,6 @@ import java.util.ArrayList; import java.util.List; public class CreateAclsResponse extends AbstractResponse { - private final static String THROTTLE_TIME_MS = "throttle_time_ms"; private final static String CREATION_RESPONSES = "creation_responses"; private final static String ERROR_CODE = "error_code"; private final static String ERROR_MESSAGE = "error_message"; @@ -57,7 +56,7 @@ public class CreateAclsResponse extends AbstractResponse { } public CreateAclsResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS); + this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); this.aclCreationResponses = new ArrayList<>(); for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) { Struct responseStruct = (Struct) responseStructObj; @@ -75,7 +74,7 @@ public class CreateAclsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.CREATE_ACLS.responseSchema(version)); - struct.set(THROTTLE_TIME_MS, throttleTimeMs); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); List<Struct> responseStructs = new ArrayList<>(); for (AclCreationResponse response : aclCreationResponses) { Struct responseStruct = struct.instance(CREATION_RESPONSES); http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java index e46e7a1..c34265d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; public class CreateTopicsResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors"; private static final String TOPIC_KEY_NAME = "topic"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java index 6fffc0f..341021b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java @@ -35,7 +35,6 @@ import java.util.List; public class DeleteAclsResponse extends AbstractResponse { public static final Logger log = LoggerFactory.getLogger(DeleteAclsResponse.class); - private final static String THROTTLE_TIME_MS = "throttle_time_ms"; private final static String FILTER_RESPONSES = "filter_responses"; private final static String ERROR_CODE = "error_code"; private final static String ERROR_MESSAGE = "error_message"; @@ -97,7 +96,7 @@ public class DeleteAclsResponse extends AbstractResponse { } public DeleteAclsResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS); + this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); this.responses = new ArrayList<>(); for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) { Struct responseStruct = (Struct) responseStructObj; @@ -130,7 +129,7 @@ public class DeleteAclsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DELETE_ACLS.responseSchema(version)); - struct.set(THROTTLE_TIME_MS, throttleTimeMs); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); List<Struct> responseStructs = new ArrayList<>(); for (AclFilterResponse response : responses) { Struct responseStruct = struct.instance(FILTER_RESPONSES); http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java index 64165eb..f19f933 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java @@ -33,7 +33,6 @@ public class DeleteRecordsResponse extends AbstractResponse { public static final long INVALID_LOW_WATERMARK = -1L; // request level key names - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String TOPICS_KEY_NAME = "topics"; // topic level key names http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java index 1b80d1c..3f11167 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; public class DeleteTopicsResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes"; private static final String TOPIC_KEY_NAME = "topic"; private static final String ERROR_CODE_KEY_NAME = "error_code"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java index 0de4865..127493b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java @@ -33,7 +33,6 @@ import java.util.List; import java.util.Map; public class DescribeAclsResponse extends AbstractResponse { - private final static String THROTTLE_TIME_MS = "throttle_time_ms"; private final static String ERROR_CODE = "error_code"; private final static String ERROR_MESSAGE = "error_message"; private final static String RESOURCES = "resources"; @@ -50,7 +49,7 @@ public class DescribeAclsResponse extends AbstractResponse { } public DescribeAclsResponse(Struct struct) { - this.throttleTimeMs = struct.getInt(THROTTLE_TIME_MS); + this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); Errors error = Errors.forCode(struct.getShort(ERROR_CODE)); if (error != Errors.NONE) { this.throwable = error.exception(struct.getString(ERROR_MESSAGE)); @@ -73,7 +72,7 @@ public class DescribeAclsResponse extends AbstractResponse { @Override protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version)); - struct.set(THROTTLE_TIME_MS, throttleTimeMs); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); if (throwable != null) { Errors errors = Errors.forException(throwable); struct.set(ERROR_CODE, errors.code()); http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java index 05bf88d..8694e1f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java @@ -29,8 +29,6 @@ import java.util.Map; public class DescribeConfigsResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; - private static final String RESOURCES_KEY_NAME = "resources"; private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java index bd7a087..0e1d6bd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeGroupsResponse.java @@ -29,7 +29,6 @@ import java.util.Map; public class DescribeGroupsResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String GROUPS_KEY_NAME = "groups"; private static final String ERROR_CODE_KEY_NAME = "error_code"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java index 17cf68d..9083808 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnResponse.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; public class EndTxnResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String ERROR_CODE_KEY_NAME = "error_code"; // Possible error codes: http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java index 0cb87b5..96fee43 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java @@ -43,7 +43,6 @@ public class FetchResponse extends AbstractResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITIONS_KEY_NAME = "partition_responses"; - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_HEADER_KEY_NAME = "partition_header"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java index e7df8e8..11eed1d 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/FindCoordinatorResponse.java @@ -25,7 +25,6 @@ import java.nio.ByteBuffer; public class FindCoordinatorResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String ERROR_CODE_KEY_NAME = "error_code"; private static final String ERROR_MESSAGE_KEY_NAME = "error_message"; private static final String COORDINATOR_KEY_NAME = "coordinator"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java index a90212b..cec39f0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; public class HeartbeatResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String ERROR_CODE_KEY_NAME = "error_code"; /** http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java index 96e1cdf..da5e6e5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/InitProducerIdResponse.java @@ -31,7 +31,6 @@ public class InitProducerIdResponse extends AbstractResponse { // TransactionalIdAuthorizationFailed // ClusterAuthorizationFailed - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String PRODUCER_ID_KEY_NAME = "producer_id"; private static final String EPOCH_KEY_NAME = "producer_epoch"; private static final String ERROR_CODE_KEY_NAME = "error_code"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java index a1c9e2b..eb86ce7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java @@ -28,7 +28,6 @@ import java.util.Map; public class JoinGroupResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String ERROR_CODE_KEY_NAME = "error_code"; /** http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java index ccfc8a7..1c85850 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/LeaveGroupResponse.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; public class LeaveGroupResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String ERROR_CODE_KEY_NAME = "error_code"; /** http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java index 13f589f..8ae3792 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListGroupsResponse.java @@ -27,7 +27,6 @@ import java.util.List; public class ListGroupsResponse extends AbstractResponse { - public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; public static final String ERROR_CODE_KEY_NAME = "error_code"; public static final String GROUPS_KEY_NAME = "groups"; public static final String GROUP_ID_KEY_NAME = "group_id"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java index 61c2a55..7dfaedc 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java @@ -33,7 +33,6 @@ public class ListOffsetResponse extends AbstractResponse { public static final long UNKNOWN_TIMESTAMP = -1L; public static final long UNKNOWN_OFFSET = -1L; - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String RESPONSES_KEY_NAME = "responses"; // topic level field names http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 017fdf4..74e058b 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -35,7 +35,6 @@ import java.util.Set; public class MetadataResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String BROKERS_KEY_NAME = "brokers"; private static final String TOPIC_METADATA_KEY_NAME = "topic_metadata"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java index d8d647d..06e5608 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java @@ -30,7 +30,6 @@ import java.util.Map; public class OffsetCommitResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String RESPONSES_KEY_NAME = "responses"; // topic level fields http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java index f795a5b..6315535 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java @@ -32,7 +32,6 @@ import org.apache.kafka.common.utils.CollectionUtils; public class OffsetFetchResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String RESPONSES_KEY_NAME = "responses"; private static final String ERROR_CODE_KEY_NAME = "error_code"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index 55332f6..d42f1c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -39,7 +39,6 @@ public class ProduceResponse extends AbstractResponse { // topic level field names private static final String TOPIC_KEY_NAME = "topic"; private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; // partition level field names private static final String PARTITION_KEY_NAME = "partition"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java index b99a99f..c96e21f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/SyncGroupResponse.java @@ -24,7 +24,6 @@ import java.nio.ByteBuffer; public class SyncGroupResponse extends AbstractResponse { - public static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; public static final String ERROR_CODE_KEY_NAME = "error_code"; public static final String MEMBER_ASSIGNMENT_KEY_NAME = "member_assignment"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java index a62568f..e7b349c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.Map; public class TxnOffsetCommitResponse extends AbstractResponse { - private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String TOPIC_PARTITIONS_KEY_NAME = "topics"; private static final String PARTITIONS_KEY_NAME = "partitions"; private static final String TOPIC_KEY_NAME = "topic"; http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 4e46d57..a81dc58 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -16,8 +16,11 @@ */ package org.apache.kafka.clients.consumer.internals; +import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -40,6 +43,8 @@ import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionType; @@ -64,10 +69,14 @@ import org.apache.kafka.common.requests.ListOffsetRequest; import org.apache.kafka.common.requests.ListOffsetResponse; import org.apache.kafka.common.requests.MetadataRequest; import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.requests.FetchRequest.PartitionData; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.test.DelayedReceive; +import org.apache.kafka.test.MockSelector; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -1033,28 +1042,43 @@ public class FetcherTest { */ @Test public void testQuotaMetrics() throws Exception { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); - - // normal fetch - for (int i = 1; i < 4; i++) { - // We need to make sure the message offset grows. Otherwise they will be considered as already consumed - // and filtered out by consumer. - MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, - TimestampType.CREATE_TIME, 0L); - for (int v = 0; v < 3; v++) - builder.appendWithOffset(i * 3 + v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes()); - List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE, 100L, 100 * i).get(tp1); - assertEquals(3, records.size()); + MockSelector selector = new MockSelector(time); + Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, "consumer" + groupId); + Cluster cluster = TestUtils.singletonCluster("test", 1); + Node node = cluster.nodes().get(0); + NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, + 1000, 1000, 64 * 1024, 64 * 1024, 1000, + time, true, new ApiVersions(), throttleTimeSensor); + + short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion(); + ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400).serialize(apiVersionsResponseVersion, new ResponseHeader(0)); + selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); + while (!client.ready(node, time.milliseconds())) + client.poll(1, time.milliseconds()); + selector.clear(); + + for (int i = 1; i <= 3; i++) { + int throttleTimeMs = 100 * i; + FetchRequest.Builder builder = FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap<TopicPartition, PartitionData>()); + ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null); + client.send(request, time.milliseconds()); + client.poll(1, time.milliseconds()); + FetchResponse response = fetchResponse(nextRecords, Errors.NONE, i, throttleTimeMs); + buffer = response.serialize(ApiKeys.FETCH.latestVersion(), new ResponseHeader(request.correlationId())); + selector.completeReceive(new NetworkReceive(node.idString(), buffer)); + client.poll(1, time.milliseconds()); + selector.clear(); } - Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); - KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup)); - KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup)); - assertEquals(200, avgMetric.value(), EPSILON); - assertEquals(300, maxMetric.value(), EPSILON); + KafkaMetric avgMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-avg", metricGroup, "")); + KafkaMetric maxMetric = allMetrics.get(metrics.metricName("fetch-throttle-time-max", metricGroup, "")); + // Throttle times are ApiVersions=400, Fetch=(100, 200, 300) + assertEquals(250, avgMetric.value(), EPSILON); + assertEquals(400, maxMetric.value(), EPSILON); + client.close(); } + /* * Send multiple requests. Verify that the client side quota metrics have the right values */ http://git-wip-us.apache.org/repos/asf/kafka/blob/73ca0d21/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 719efe9..50c4cd4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -17,8 +17,10 @@ package org.apache.kafka.clients.producer.internals; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; +import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.Cluster; @@ -30,6 +32,8 @@ import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.Sensor; +import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.CompressionRatioEstimator; @@ -43,12 +47,16 @@ import org.apache.kafka.common.requests.InitProducerIdRequest; import org.apache.kafka.common.requests.ProduceRequest; import org.apache.kafka.common.requests.InitProducerIdResponse; import org.apache.kafka.common.requests.ProduceResponse; +import org.apache.kafka.common.requests.ResponseHeader; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.test.DelayedReceive; +import org.apache.kafka.test.MockSelector; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; import org.junit.Test; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; @@ -212,18 +220,41 @@ public class SenderTest { */ @Test public void testQuotaMetrics() throws Exception { - final long offset = 0; + MockSelector selector = new MockSelector(time); + Sensor throttleTimeSensor = Sender.throttleTimeSensor(metrics); + Cluster cluster = TestUtils.singletonCluster("test", 1); + Node node = cluster.nodes().get(0); + NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, + 1000, 1000, 64 * 1024, 64 * 1024, 1000, + time, true, new ApiVersions(), throttleTimeSensor); + + short apiVersionsResponseVersion = ApiKeys.API_VERSIONS.latestVersion(); + ByteBuffer buffer = ApiVersionsResponse.createApiVersionsResponse(400).serialize(apiVersionsResponseVersion, new ResponseHeader(0)); + selector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), buffer))); + while (!client.ready(node, time.milliseconds())) + client.poll(1, time.milliseconds()); + selector.clear(); + for (int i = 1; i <= 3; i++) { - accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, null, MAX_BLOCK_TIMEOUT); - sender.run(time.milliseconds()); // send produce request - client.respond(produceResponse(tp0, offset, Errors.NONE, 100 * i)); - sender.run(time.milliseconds()); + int throttleTimeMs = 100 * i; + ProduceRequest.Builder builder = new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 1000, + Collections.<TopicPartition, MemoryRecords>emptyMap()); + ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null); + client.send(request, time.milliseconds()); + client.poll(1, time.milliseconds()); + ProduceResponse response = produceResponse(tp0, i, Errors.NONE, throttleTimeMs); + buffer = response.serialize(ApiKeys.PRODUCE.latestVersion(), new ResponseHeader(request.correlationId())); + selector.completeReceive(new NetworkReceive(node.idString(), buffer)); + client.poll(1, time.milliseconds()); + selector.clear(); } Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); KafkaMetric avgMetric = allMetrics.get(metrics.metricName("produce-throttle-time-avg", METRIC_GROUP, "")); KafkaMetric maxMetric = allMetrics.get(metrics.metricName("produce-throttle-time-max", METRIC_GROUP, "")); - assertEquals(200, avgMetric.value(), EPS); - assertEquals(300, maxMetric.value(), EPS); + // Throttle times are ApiVersions=400, Produce=(100, 200, 300) + assertEquals(250, avgMetric.value(), EPS); + assertEquals(400, maxMetric.value(), EPS); + client.close(); } @Test