Repository: kafka Updated Branches: refs/heads/trunk a565a77b1 -> b565dd7eb
KAFKA-4429; Consumer lag metric should be zero if FetchResponse is empty Author: Dong Lin <lindon...@gmail.com> Reviewers: Ewen Cheslack-Postava <m...@ewencp.org>, Jason Gustafson <ja...@confluent.io> Closes #2155 from lindong28/KAFKA-4429 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b565dd7e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b565dd7e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b565dd7e Branch: refs/heads/trunk Commit: b565dd7eb184da5ef3b08c88a8acc3df221aaa08 Parents: a565a77 Author: Dong Lin <lindon...@gmail.com> Authored: Tue Jan 3 13:54:40 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Tue Jan 3 13:54:40 2017 -0800 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 3 ++ .../clients/consumer/internals/FetcherTest.java | 49 +++++++++++++++----- 2 files changed, 40 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b565dd7e/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 526b0a9..22588a8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -701,6 +701,9 @@ public class Fetcher<K, V> { parsedRecords = new PartitionRecords<>(fetchOffset, tp, parsed); ConsumerRecord<K, V> record = parsed.get(parsed.size() - 1); this.sensors.recordsFetchLag.record(partition.highWatermark - record.offset()); + } else if (partition.highWatermark >= 0) { + log.trace("Received empty fetch response for partition {} with offset {}", tp, position); + this.sensors.recordsFetchLag.record(partition.highWatermark - fetchOffset); } } else if (error == Errors.NOT_LEADER_FOR_PARTITION) { log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); http://git-wip-us.apache.org/repos/asf/kafka/blob/b565dd7e/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 272a5ee..0095697 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -607,7 +607,6 @@ public class FetcherTest { */ @Test public void testQuotaMetrics() throws Exception { - List<ConsumerRecord<byte[], byte[]>> records; subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); @@ -615,17 +614,10 @@ public class FetcherTest { for (int i = 1; i < 4; i++) { // We need to make sure the message offset grows. Otherwise they will be considered as already consumed // and filtered out by consumer. - if (i > 1) { - MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); - for (int v = 0; v < 3; v++) { - builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); - } - this.records = builder.build(); - } - assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(this.records, Errors.NONE.code(), 100L, 100 * i)); - consumerClient.poll(0); - records = fetcher.fetchedRecords().get(tp); + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); + for (int v = 0; v < 3; v++) + builder.appendWithOffset((long) i * 3 + v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); + List<ConsumerRecord<byte[], byte[]>> records = fetchRecords(builder.build(), Errors.NONE.code(), 100L, 100 * i).get(tp); assertEquals(3, records.size()); } @@ -636,6 +628,39 @@ public class FetcherTest { assertEquals(300, maxMetric.value(), EPSILON); } + /* + * Send multiple requests. Verify that the client side quota metrics have the right values + */ + @Test + public void testFetcherMetrics() { + subscriptions.assignFromUser(singleton(tp)); + subscriptions.seek(tp, 0); + + Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); + KafkaMetric recordsFetchLagMax = allMetrics.get(metrics.metricName("records-lag-max", metricGroup, "")); + + // recordsFetchLagMax should be initialized to negative infinity + assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON); + + // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse + fetchRecords(MemoryRecords.EMPTY, Errors.NONE.code(), 100L, 0); + assertEquals(100, recordsFetchLagMax.value(), EPSILON); + + // recordsFetchLagMax should be hw - offset of the last message after receiving a non-empty FetchResponse + MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME); + for (int v = 0; v < 3; v++) + builder.appendWithOffset((long) v, Record.NO_TIMESTAMP, "key".getBytes(), String.format("value-%d", v).getBytes()); + fetchRecords(builder.build(), Errors.NONE.code(), 200L, 0); + assertEquals(198, recordsFetchLagMax.value(), EPSILON); + } + + private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords records, short error, long hw, int throttleTime) { + assertEquals(1, fetcher.sendFetches()); + client.prepareResponse(fetchResponse(records, error, hw, throttleTime)); + consumerClient.poll(0); + return fetcher.fetchedRecords(); + } + @Test public void testGetOffsetsForTimesTimeout() { try {