Repository: kafka Updated Branches: refs/heads/trunk cdff011fe -> 5bb53e034
KAFKA-5534; KafkaConsumer `offsetForTimes` result should include partitions with no offset For topics that support timestamp search, if no offset is found for a partition, the partition should still be included in the result with a `null` offset value. This `KafkaConsumer` method currently excludes such partitions from the result. Author: Vahid Hashemian <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]> Closes #3460 from vahidhashemian/KAFKA-5534 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5bb53e03 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5bb53e03 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5bb53e03 Branch: refs/heads/trunk Commit: 5bb53e034e4f8a06550dd06377fae7b3c2137ce2 Parents: cdff011 Author: Vahid Hashemian <[email protected]> Authored: Thu Jul 20 17:31:24 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Thu Jul 20 17:38:30 2017 -0700 ---------------------------------------------------------------------- .../clients/consumer/internals/Fetcher.java | 22 +- .../clients/consumer/internals/FetcherTest.java | 590 ++++++++++--------- .../kafka/admin/ConsumerGroupCommand.scala | 2 +- .../kafka/api/LegacyAdminClientTest.scala | 6 + docs/upgrade.html | 2 + 5 files changed, 327 insertions(+), 295 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5bb53e03/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 2703823..f5ebac5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -429,14 +429,17 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> timestampsToSearch, long timeout) { Map<TopicPartition, OffsetData> offsetData = retrieveOffsetsByTimes(timestampsToSearch, timeout, true); - HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(offsetData.size()); + + HashMap<TopicPartition, OffsetAndTimestamp> offsetsByTimes = new HashMap<>(timestampsToSearch.size()); + for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) + offsetsByTimes.put(entry.getKey(), null); + for (Map.Entry<TopicPartition, OffsetData> entry : offsetData.entrySet()) { - OffsetData data = entry.getValue(); - if (data == null) - offsetsByTimes.put(entry.getKey(), null); - else - offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(data.offset, data.timestamp)); + // 'entry.getValue().timestamp' will not be null since we are guaranteed + // to work with a v1 (or later) ListOffset request + offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(entry.getValue().offset, entry.getValue().timestamp)); } + return offsetsByTimes; } @@ -683,8 +686,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { * @param listOffsetResponse The response from the server. * @param future The future to be completed when the response returns. Note that any partition-level errors will * generally fail the entire future result. The one exception is UNSUPPORTED_FOR_MESSAGE_FORMAT, - * which indicates that the broker does not support the v1 message format and results in a null - * being inserted into the resulting map. + * which indicates that the broker does not support the v1 message format. Partitions with this + * particular error are simply left out of the future map. Note that the corresponding timestamp + * value of each partition may be null only for v0. In v1 and later the ListOffset API would not + * return a null timestamp (-1 is returned instead when necessary). */ @SuppressWarnings("deprecation") private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch, @@ -727,7 +732,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { // The message format on the broker side is before 0.10.0, we simply put null in the response. log.debug("Cannot search by timestamp for partition {} because the message format version " + "is before 0.10.0", topicPartition); - timestampOffsetMap.put(topicPartition, null); } else if (error == Errors.NOT_LEADER_FOR_PARTITION) { log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", topicPartition); http://git-wip-us.apache.org/repos/asf/kafka/blob/5bb53e03/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 0801979..c0edcfd 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -111,8 +111,8 @@ public class FetcherTest { private String topicName = "test"; private String groupId = "test-group"; private final String metricGroup = "consumer" + groupId + "-fetch-manager-metrics"; - private TopicPartition tp1 = new TopicPartition(topicName, 0); - private TopicPartition tp2 = new TopicPartition(topicName, 1); + private TopicPartition tp0 = new TopicPartition(topicName, 0); + private TopicPartition tp1 = new TopicPartition(topicName, 1); private int minBytes = 1; private int maxBytes = Integer.MAX_VALUE; private int maxWaitMs = 0; @@ -164,23 +164,23 @@ public class FetcherTest { @Test public void testFetchNormal() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords(); - assertTrue(partitionRecords.containsKey(tp1)); + assertTrue(partitionRecords.containsKey(tp0)); - List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1); + List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp0); assertEquals(3, records.size()); - assertEquals(4L, subscriptions.position(tp1).longValue()); // this is the next fetching position + assertEquals(4L, subscriptions.position(tp0).longValue()); // this is the next fetching position long offset = 1; for (ConsumerRecord<byte[], byte[]> record : records) { assertEquals(offset, record.offset()); @@ -190,8 +190,8 @@ public class FetcherTest { @Test public void testFetcherIgnoresControlRecords() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); @@ -213,16 +213,16 @@ public class FetcherTest { buffer.flip(); - client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords(); - assertTrue(partitionRecords.containsKey(tp1)); + assertTrue(partitionRecords.containsKey(tp0)); - List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1); + List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp0); assertEquals(1, records.size()); - assertEquals(2L, subscriptions.position(tp1).longValue()); + assertEquals(2L, subscriptions.position(tp0).longValue()); ConsumerRecord<byte[], byte[]> record = records.get(0); assertArrayEquals("key".getBytes(), record.key()); @@ -230,18 +230,18 @@ public class FetcherTest { @Test public void testFetchError() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetcher.fetchedRecords(); - assertFalse(partitionRecords.containsKey(tp1)); + assertFalse(partitionRecords.containsKey(tp0)); } private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) { @@ -274,10 +274,10 @@ public class FetcherTest { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer); - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 1); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 1); - client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(matchesOffset(tp0, 1), fetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); @@ -288,7 +288,7 @@ public class FetcherTest { fail("fetchedRecords should have raised"); } catch (SerializationException e) { // the position should not advance since no data has been returned - assertEquals(1, subscriptions.position(tp1).longValue()); + assertEquals(1, subscriptions.position(tp0).longValue()); } } } @@ -325,17 +325,17 @@ public class FetcherTest { buffer.flip(); - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); // the first fetchedRecords() should return the first valid message - assertEquals(1, fetcher.fetchedRecords().get(tp1).size()); - assertEquals(1, subscriptions.position(tp1).longValue()); + assertEquals(1, fetcher.fetchedRecords().get(tp0).size()); + assertEquals(1, subscriptions.position(tp0).longValue()); // the fetchedRecords() should always throw exception due to the second invalid message for (int i = 0; i < 2; i++) { @@ -343,22 +343,22 @@ public class FetcherTest { fetcher.fetchedRecords(); fail("fetchedRecords should have raised KafkaException"); } catch (KafkaException e) { - assertEquals(1, subscriptions.position(tp1).longValue()); + assertEquals(1, subscriptions.position(tp0).longValue()); } } // Seek to skip the bad record and fetch again. - subscriptions.seek(tp1, 2); + subscriptions.seek(tp0, 2); // Should not throw exception after the seek. fetcher.fetchedRecords(); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); - List<ConsumerRecord<byte[], byte[]>> records = fetcher.fetchedRecords().get(tp1); + List<ConsumerRecord<byte[], byte[]>> records = fetcher.fetchedRecords().get(tp0); assertEquals(1, records.size()); assertEquals(2L, records.get(0).offset()); - assertEquals(3, subscriptions.position(tp1).longValue()); + assertEquals(3, subscriptions.position(tp0).longValue()); } @Test @@ -380,12 +380,12 @@ public class FetcherTest { buffer.put("beef".getBytes()); buffer.position(0); - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); // the fetchedRecords() should always throw exception due to the bad batch. @@ -394,7 +394,7 @@ public class FetcherTest { fetcher.fetchedRecords(); fail("fetchedRecords should have raised KafkaException"); } catch (KafkaException e) { - assertEquals(0, subscriptions.position(tp1).longValue()); + assertEquals(0, subscriptions.position(tp0).longValue()); } } } @@ -411,19 +411,19 @@ public class FetcherTest { // flip some bits to fail the crc buffer.putInt(32, buffer.get(32) ^ 87238423); - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, MemoryRecords.readableRecords(buffer), Errors.NONE, 100L, 0)); consumerClient.poll(0); try { fetcher.fetchedRecords(); fail("fetchedRecords should have raised"); } catch (KafkaException e) { // the position should not advance since no data has been returned - assertEquals(0, subscriptions.position(tp1).longValue()); + assertEquals(0, subscriptions.position(tp0).longValue()); } } @@ -446,14 +446,14 @@ public class FetcherTest { MemoryRecords memoryRecords = builder.build(); List<ConsumerRecord<byte[], byte[]>> records; - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 1); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 1); - client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, memoryRecords, Errors.NONE, 100L, 0)); + client.prepareResponse(matchesOffset(tp0, 1), fetchResponse(tp0, memoryRecords, Errors.NONE, 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); - records = fetcher.fetchedRecords().get(tp1); + records = fetcher.fetchedRecords().get(tp0); assertEquals(3, records.size()); @@ -476,32 +476,32 @@ public class FetcherTest { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2); List<ConsumerRecord<byte[], byte[]>> records; - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 1); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 1); - client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); - client.prepareResponse(matchesOffset(tp1, 4), fetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0)); + client.prepareResponse(matchesOffset(tp0, 1), fetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(matchesOffset(tp0, 4), fetchResponse(tp0, this.nextRecords, Errors.NONE, 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); - records = fetcher.fetchedRecords().get(tp1); + records = fetcher.fetchedRecords().get(tp0); assertEquals(2, records.size()); - assertEquals(3L, subscriptions.position(tp1).longValue()); + assertEquals(3L, subscriptions.position(tp0).longValue()); assertEquals(1, records.get(0).offset()); assertEquals(2, records.get(1).offset()); assertEquals(0, fetcher.sendFetches()); consumerClient.poll(0); - records = fetcher.fetchedRecords().get(tp1); + records = fetcher.fetchedRecords().get(tp0); assertEquals(1, records.size()); - assertEquals(4L, subscriptions.position(tp1).longValue()); + assertEquals(4L, subscriptions.position(tp0).longValue()); assertEquals(3, records.get(0).offset()); assertTrue(fetcher.sendFetches() > 0); consumerClient.poll(0); - records = fetcher.fetchedRecords().get(tp1); + records = fetcher.fetchedRecords().get(tp0); assertEquals(2, records.size()); - assertEquals(6L, subscriptions.position(tp1).longValue()); + assertEquals(6L, subscriptions.position(tp0).longValue()); assertEquals(4, records.get(0).offset()); assertEquals(5, records.get(1).offset()); } @@ -516,31 +516,31 @@ public class FetcherTest { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(time), 2); List<ConsumerRecord<byte[], byte[]>> records; - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 1); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 1); // Returns 3 records while `max.poll.records` is configured to 2 - client.prepareResponse(matchesOffset(tp1, 1), fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(matchesOffset(tp0, 1), fetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); - records = fetcher.fetchedRecords().get(tp1); + records = fetcher.fetchedRecords().get(tp0); assertEquals(2, records.size()); - assertEquals(3L, subscriptions.position(tp1).longValue()); + assertEquals(3L, subscriptions.position(tp0).longValue()); assertEquals(1, records.get(0).offset()); assertEquals(2, records.get(1).offset()); - subscriptions.assignFromUser(singleton(tp2)); - client.prepareResponse(matchesOffset(tp2, 4), fetchResponse(tp2, this.nextRecords, Errors.NONE, 100L, 0)); - subscriptions.seek(tp2, 4); + subscriptions.assignFromUser(singleton(tp1)); + client.prepareResponse(matchesOffset(tp1, 4), fetchResponse(tp1, this.nextRecords, Errors.NONE, 100L, 0)); + subscriptions.seek(tp1, 4); assertEquals(1, fetcher.sendFetches()); consumerClient.poll(0); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); - assertNull(fetchedRecords.get(tp1)); - records = fetchedRecords.get(tp2); + assertNull(fetchedRecords.get(tp0)); + records = fetchedRecords.get(tp1); assertEquals(2, records.size()); - assertEquals(6L, subscriptions.position(tp2).longValue()); + assertEquals(6L, subscriptions.position(tp1).longValue()); assertEquals(4, records.get(0).offset()); assertEquals(5, records.get(1).offset()); } @@ -558,16 +558,16 @@ public class FetcherTest { MemoryRecords records = builder.build(); List<ConsumerRecord<byte[], byte[]>> consumerRecords; - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, records, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, records, Errors.NONE, 100L, 0)); consumerClient.poll(0); - consumerRecords = fetcher.fetchedRecords().get(tp1); + consumerRecords = fetcher.fetchedRecords().get(tp0); assertEquals(3, consumerRecords.size()); - assertEquals(31L, subscriptions.position(tp1).longValue()); // this is the next fetching position + assertEquals(31L, subscriptions.position(tp0).longValue()); // this is the next fetching position assertEquals(15L, consumerRecords.get(0).offset()); assertEquals(20L, consumerRecords.get(1).offset()); @@ -590,7 +590,7 @@ public class FetcherTest { } catch (RecordTooLargeException e) { assertTrue(e.getMessage().startsWith("There are some messages at [Partition=Offset]: ")); // the position should not advance since no data has been returned - assertEquals(0, subscriptions.position(tp1).longValue()); + assertEquals(0, subscriptions.position(tp0).longValue()); } } finally { client.setNodeApiVersions(NodeApiVersions.create()); @@ -612,30 +612,30 @@ public class FetcherTest { } catch (KafkaException e) { assertTrue(e.getMessage().startsWith("Failed to make progress reading messages")); // the position should not advance since no data has been returned - assertEquals(0, subscriptions.position(tp1).longValue()); + assertEquals(0, subscriptions.position(tp0).longValue()); } } private void makeFetchRequestWithIncompleteRecord() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); assertFalse(fetcher.hasCompletedFetches()); MemoryRecords partialRecord = MemoryRecords.readableRecords( ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0})); - client.prepareResponse(fetchResponse(tp1, partialRecord, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, partialRecord, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); } @Test public void testUnauthorizedTopic() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); // resize the limit of the buffer to pretend it is only fetch-size large assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.TOPIC_AUTHORIZATION_FAILED, 100L, 0)); consumerClient.poll(0); try { fetcher.fetchedRecords(); @@ -648,14 +648,14 @@ public class FetcherTest { @Test public void testFetchDuringRebalance() { subscriptions.subscribe(singleton(topicName), listener); - subscriptions.assignFromSubscribed(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromSubscribed(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); // Now the rebalance happens and fetch positions are cleared - subscriptions.assignFromSubscribed(singleton(tp1)); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); + subscriptions.assignFromSubscribed(singleton(tp0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); // The active fetch should be ignored since its position is no longer valid @@ -664,34 +664,34 @@ public class FetcherTest { @Test public void testInFlightFetchOnPausedPartition() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); - subscriptions.pause(tp1); + subscriptions.pause(tp0); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); - assertNull(fetcher.fetchedRecords().get(tp1)); + assertNull(fetcher.fetchedRecords().get(tp0)); } @Test public void testFetchOnPausedPartition() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); - subscriptions.pause(tp1); + subscriptions.pause(tp0); assertFalse(fetcher.sendFetches() > 0); assertTrue(client.requests().isEmpty()); } @Test public void testFetchNotLeaderForPartition() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -699,11 +699,11 @@ public class FetcherTest { @Test public void testFetchUnknownTopicOrPartition() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION, 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds())); @@ -711,62 +711,62 @@ public class FetcherTest { @Test public void testFetchOffsetOutOfRange() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); - assertTrue(subscriptions.isOffsetResetNeeded(tp1)); - assertEquals(null, subscriptions.position(tp1)); + assertTrue(subscriptions.isOffsetResetNeeded(tp0)); + assertEquals(null, subscriptions.position(tp0)); } @Test public void testStaleOutOfRangeError() { // verify that an out of range error which arrives after a seek // does not cause us to reset our position or throw an exception - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); - subscriptions.seek(tp1, 1); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); + subscriptions.seek(tp0, 1); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertEquals(1, subscriptions.position(tp1).longValue()); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertEquals(1, subscriptions.position(tp0).longValue()); } @Test public void testFetchedRecordsAfterSeek() { - subscriptionsNoAutoReset.assignFromUser(singleton(tp1)); - subscriptionsNoAutoReset.seek(tp1, 0); + subscriptionsNoAutoReset.assignFromUser(singleton(tp0)); + subscriptionsNoAutoReset.seek(tp0, 0); assertTrue(fetcherNoAutoReset.sendFetches() > 0); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(0); - assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1)); - subscriptionsNoAutoReset.seek(tp1, 2); + assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp0)); + subscriptionsNoAutoReset.seek(tp0, 2); assertEquals(0, fetcherNoAutoReset.fetchedRecords().size()); } @Test public void testFetchOffsetOutOfRangeException() { - subscriptionsNoAutoReset.assignFromUser(singleton(tp1)); - subscriptionsNoAutoReset.seek(tp1, 0); + subscriptionsNoAutoReset.assignFromUser(singleton(tp0)); + subscriptionsNoAutoReset.seek(tp0, 0); fetcherNoAutoReset.sendFetches(); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L, 0)); consumerClient.poll(0); - assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp1)); + assertFalse(subscriptionsNoAutoReset.isOffsetResetNeeded(tp0)); for (int i = 0; i < 2; i++) { try { fetcherNoAutoReset.fetchedRecords(); fail("Should have thrown OffsetOutOfRangeException"); } catch (OffsetOutOfRangeException e) { - assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1)); + assertTrue(e.offsetOutOfRangePartitions().containsKey(tp0)); assertEquals(e.offsetOutOfRangePartitions().size(), 1); } } @@ -776,16 +776,16 @@ public class FetcherTest { public void testFetchPositionAfterException() { // verify the advancement in the next fetch offset equals to the number of fetched records when // some fetched partitions cause Exception. This ensures that consumer won't lose record upon exception - subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2)); + subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp0, tp1)); + subscriptionsNoAutoReset.seek(tp0, 1); subscriptionsNoAutoReset.seek(tp1, 1); - subscriptionsNoAutoReset.seek(tp2, 1); assertEquals(1, fetcherNoAutoReset.sendFetches()); Map<TopicPartition, FetchResponse.PartitionData> partitions = new LinkedHashMap<>(); - partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100, + partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records)); - partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100, + partitions.put(tp0, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)); client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0)); consumerClient.poll(0); @@ -796,7 +796,7 @@ public class FetcherTest { for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values()) fetchedRecords.addAll(records); - assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp2) - 1); + assertEquals(fetchedRecords.size(), subscriptionsNoAutoReset.position(tp1) - 1); try { for (List<ConsumerRecord<byte[], byte[]>> records: fetcherNoAutoReset.fetchedRecords().values()) @@ -805,13 +805,13 @@ public class FetcherTest { exceptions.add(e); } - assertEquals(4, subscriptionsNoAutoReset.position(tp2).longValue()); + assertEquals(4, subscriptionsNoAutoReset.position(tp1).longValue()); assertEquals(3, fetchedRecords.size()); // Should have received one OffsetOutOfRangeException for partition tp1 assertEquals(1, exceptions.size()); OffsetOutOfRangeException e = exceptions.get(0); - assertTrue(e.offsetOutOfRangePartitions().containsKey(tp1)); + assertTrue(e.offsetOutOfRangePartitions().containsKey(tp0)); assertEquals(e.offsetOutOfRangePartitions().size(), 1); } @@ -819,51 +819,51 @@ public class FetcherTest { public void testSeekBeforeException() { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptionsNoAutoReset, new Metrics(time), 2); - subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1)); - subscriptionsNoAutoReset.seek(tp1, 1); + subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp0)); + subscriptionsNoAutoReset.seek(tp0, 1); assertEquals(1, fetcher.sendFetches()); Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>(); - partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, + partitions.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records)); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0)); consumerClient.poll(0); - assertEquals(2, fetcher.fetchedRecords().get(tp1).size()); + assertEquals(2, fetcher.fetchedRecords().get(tp0).size()); - subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp1, tp2)); - subscriptionsNoAutoReset.seek(tp2, 1); + subscriptionsNoAutoReset.assignFromUser(Utils.mkSet(tp0, tp1)); + subscriptionsNoAutoReset.seek(tp1, 1); assertEquals(1, fetcher.sendFetches()); partitions = new HashMap<>(); - partitions.put(tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100, + partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)); client.prepareResponse(new FetchResponse(new LinkedHashMap<>(partitions), 0)); consumerClient.poll(0); - assertEquals(1, fetcher.fetchedRecords().get(tp1).size()); + assertEquals(1, fetcher.fetchedRecords().get(tp0).size()); - subscriptionsNoAutoReset.seek(tp2, 10); + subscriptionsNoAutoReset.seek(tp1, 10); // Should not throw OffsetOutOfRangeException after the seek assertEquals(0, fetcher.fetchedRecords().size()); } @Test public void testFetchDisconnected() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, this.records, Errors.NONE, 100L, 0), true); + client.prepareResponse(fetchResponse(tp0, this.records, Errors.NONE, 100L, 0), true); consumerClient.poll(0); assertEquals(0, fetcher.fetchedRecords().size()); // disconnects should have no affect on subscription state - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertTrue(subscriptions.isFetchable(tp1)); - assertEquals(0, subscriptions.position(tp1).longValue()); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertTrue(subscriptions.isFetchable(tp0)); + assertEquals(0, subscriptions.position(tp0).longValue()); } @Test public void testUpdateFetchPositionsNoneCommittedNoResetStrategy() { - Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp1, tp2)); + Set<TopicPartition> tps = new HashSet<>(Arrays.asList(tp0, tp1)); subscriptionsNoAutoReset.assignFromUser(tps); try { fetcherNoAutoReset.updateFetchPositions(tps); @@ -879,38 +879,38 @@ public class FetcherTest { public void testUpdateFetchPositionToCommitted() { // unless a specific reset is expected, the default behavior is to reset to the committed // position if one is present - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.committed(tp1, new OffsetAndMetadata(5)); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.committed(tp0, new OffsetAndMetadata(5)); - fetcher.updateFetchPositions(singleton(tp1)); - assertTrue(subscriptions.isFetchable(tp1)); - assertEquals(5, subscriptions.position(tp1).longValue()); + fetcher.updateFetchPositions(singleton(tp0)); + assertTrue(subscriptions.isFetchable(tp0)); + assertEquals(5, subscriptions.position(tp0).longValue()); } @Test public void testUpdateFetchPositionResetToDefaultOffset() { - subscriptions.assignFromUser(singleton(tp1)); + subscriptions.assignFromUser(singleton(tp0)); // with no commit position, we should reset using the default strategy defined above (EARLIEST) client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 5L)); - fetcher.updateFetchPositions(singleton(tp1)); - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertTrue(subscriptions.isFetchable(tp1)); - assertEquals(5, subscriptions.position(tp1).longValue()); + fetcher.updateFetchPositions(singleton(tp0)); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertTrue(subscriptions.isFetchable(tp0)); + assertEquals(5, subscriptions.position(tp0).longValue()); } @Test public void testUpdateFetchPositionResetToLatestOffset() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 5L)); - fetcher.updateFetchPositions(singleton(tp1)); - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertTrue(subscriptions.isFetchable(tp1)); - assertEquals(5, subscriptions.position(tp1).longValue()); + fetcher.updateFetchPositions(singleton(tp0)); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertTrue(subscriptions.isFetchable(tp0)); + assertEquals(5, subscriptions.position(tp0).longValue()); } @Test @@ -919,8 +919,8 @@ public class FetcherTest { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel); - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST); client.prepareResponse(new MockClient.RequestMatcher() { @Override @@ -929,30 +929,30 @@ public class FetcherTest { return request.isolationLevel() == isolationLevel; } }, listOffsetResponse(Errors.NONE, 1L, 5L)); - fetcher.updateFetchPositions(singleton(tp1)); - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertTrue(subscriptions.isFetchable(tp1)); - assertEquals(5, subscriptions.position(tp1).longValue()); + fetcher.updateFetchPositions(singleton(tp0)); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertTrue(subscriptions.isFetchable(tp0)); + assertEquals(5, subscriptions.position(tp0).longValue()); } } @Test public void testUpdateFetchPositionResetToEarliestOffset() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.needOffsetReset(tp1, OffsetResetStrategy.EARLIEST); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.needOffsetReset(tp0, OffsetResetStrategy.EARLIEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 5L)); - fetcher.updateFetchPositions(singleton(tp1)); - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertTrue(subscriptions.isFetchable(tp1)); - assertEquals(5, subscriptions.position(tp1).longValue()); + fetcher.updateFetchPositions(singleton(tp0)); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertTrue(subscriptions.isFetchable(tp0)); + assertEquals(5, subscriptions.position(tp0).longValue()); } @Test public void testUpdateFetchPositionDisconnect() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST); // First request gets a disconnect client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), @@ -961,72 +961,72 @@ public class FetcherTest { // Next one succeeds client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 5L)); - fetcher.updateFetchPositions(singleton(tp1)); - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertTrue(subscriptions.isFetchable(tp1)); - assertEquals(5, subscriptions.position(tp1).longValue()); + fetcher.updateFetchPositions(singleton(tp0)); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertTrue(subscriptions.isFetchable(tp0)); + assertEquals(5, subscriptions.position(tp0).longValue()); } @Test public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.committed(tp1, new OffsetAndMetadata(0)); - subscriptions.pause(tp1); // paused partition does not have a valid position - subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.committed(tp0, new OffsetAndMetadata(0)); + subscriptions.pause(tp0); // paused partition does not have a valid position + subscriptions.needOffsetReset(tp0, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 10L)); - fetcher.updateFetchPositions(singleton(tp1)); + fetcher.updateFetchPositions(singleton(tp0)); - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused - assertTrue(subscriptions.hasValidPosition(tp1)); - assertEquals(10, subscriptions.position(tp1).longValue()); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused + assertTrue(subscriptions.hasValidPosition(tp0)); + assertEquals(10, subscriptions.position(tp0).longValue()); } @Test public void testUpdateFetchPositionOfPausedPartitionsWithoutACommittedOffset() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.pause(tp1); // paused partition does not have a valid position + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.pause(tp0); // paused partition does not have a valid position client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), listOffsetResponse(Errors.NONE, 1L, 0L)); - fetcher.updateFetchPositions(singleton(tp1)); + fetcher.updateFetchPositions(singleton(tp0)); - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused - assertTrue(subscriptions.hasValidPosition(tp1)); - assertEquals(0, subscriptions.position(tp1).longValue()); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused + assertTrue(subscriptions.hasValidPosition(tp0)); + assertEquals(0, subscriptions.position(tp0).longValue()); } @Test public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.committed(tp1, new OffsetAndMetadata(0)); - subscriptions.pause(tp1); // paused partition does not have a valid position - subscriptions.seek(tp1, 10); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.committed(tp0, new OffsetAndMetadata(0)); + subscriptions.pause(tp0); // paused partition does not have a valid position + subscriptions.seek(tp0, 10); - fetcher.updateFetchPositions(singleton(tp1)); + fetcher.updateFetchPositions(singleton(tp0)); - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused - assertTrue(subscriptions.hasValidPosition(tp1)); - assertEquals(10, subscriptions.position(tp1).longValue()); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused + assertTrue(subscriptions.hasValidPosition(tp0)); + assertEquals(10, subscriptions.position(tp0).longValue()); } @Test public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.committed(tp1, new OffsetAndMetadata(0)); - subscriptions.seek(tp1, 10); - subscriptions.pause(tp1); // paused partition already has a valid position + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.committed(tp0, new OffsetAndMetadata(0)); + subscriptions.seek(tp0, 10); + subscriptions.pause(tp0); // paused partition already has a valid position - fetcher.updateFetchPositions(singleton(tp1)); + fetcher.updateFetchPositions(singleton(tp0)); - assertFalse(subscriptions.isOffsetResetNeeded(tp1)); - assertFalse(subscriptions.isFetchable(tp1)); // because tp is paused - assertTrue(subscriptions.hasValidPosition(tp1)); - assertEquals(10, subscriptions.position(tp1).longValue()); + assertFalse(subscriptions.isOffsetResetNeeded(tp0)); + assertFalse(subscriptions.isFetchable(tp0)); // because tp is paused + assertTrue(subscriptions.hasValidPosition(tp0)); + assertEquals(10, subscriptions.position(tp0).longValue()); } @Test @@ -1117,7 +1117,7 @@ public class FetcherTest { ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true, null); client.send(request, time.milliseconds()); client.poll(1, time.milliseconds()); - FetchResponse response = fetchResponse(tp1, nextRecords, Errors.NONE, i, throttleTimeMs); + FetchResponse response = fetchResponse(tp0, nextRecords, Errors.NONE, i, throttleTimeMs); buffer = response.serialize(ApiKeys.FETCH.latestVersion(), new ResponseHeader(request.correlationId())); selector.completeReceive(new NetworkReceive(node.idString(), buffer)); client.poll(1, time.milliseconds()); @@ -1137,11 +1137,11 @@ public class FetcherTest { */ @Test public void testFetcherMetrics() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax); - MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup); + MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric); @@ -1150,7 +1150,7 @@ public class FetcherTest { assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON); // recordsFetchLagMax should be hw - fetchOffset after receiving an empty FetchResponse - fetchRecords(tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 0); + fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 0); assertEquals(100, recordsFetchLagMax.value(), EPSILON); KafkaMetric partitionLag = allMetrics.get(partitionLagMetric); @@ -1161,7 +1161,7 @@ public class FetcherTest { TimestampType.CREATE_TIME, 0L); for (int v = 0; v < 3; v++) builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes()); - fetchRecords(tp1, builder.build(), Errors.NONE, 200L, 0); + fetchRecords(tp0, builder.build(), Errors.NONE, 200L, 0); assertEquals(197, recordsFetchLagMax.value(), EPSILON); assertEquals(197, partitionLag.value(), EPSILON); @@ -1176,11 +1176,11 @@ public class FetcherTest { fetcher = createFetcher(subscriptions, metrics, new ByteArrayDeserializer(), new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED); - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); MetricName maxLagMetric = metrics.metricInstance(metricsRegistry.recordsLagMax); - MetricName partitionLagMetric = metrics.metricName(tp1 + ".records-lag", metricGroup); + MetricName partitionLagMetric = metrics.metricName(tp0 + ".records-lag", metricGroup); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); KafkaMetric recordsFetchLagMax = allMetrics.get(maxLagMetric); @@ -1189,7 +1189,7 @@ public class FetcherTest { assertEquals(Double.NEGATIVE_INFINITY, recordsFetchLagMax.value(), EPSILON); // recordsFetchLagMax should be lso - fetchOffset after receiving an empty FetchResponse - fetchRecords(tp1, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0); + fetchRecords(tp0, MemoryRecords.EMPTY, Errors.NONE, 100L, 50L, 0); assertEquals(50, recordsFetchLagMax.value(), EPSILON); KafkaMetric partitionLag = allMetrics.get(partitionLagMetric); @@ -1200,7 +1200,7 @@ public class FetcherTest { TimestampType.CREATE_TIME, 0L); for (int v = 0; v < 3; v++) builder.appendWithOffset(v, RecordBatch.NO_TIMESTAMP, "key".getBytes(), ("value-" + v).getBytes()); - fetchRecords(tp1, builder.build(), Errors.NONE, 200L, 150L, 0); + fetchRecords(tp0, builder.build(), Errors.NONE, 200L, 150L, 0); assertEquals(147, recordsFetchLagMax.value(), EPSILON); assertEquals(147, partitionLag.value(), EPSILON); @@ -1211,8 +1211,8 @@ public class FetcherTest { @Test public void testFetchResponseMetrics() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg)); @@ -1228,15 +1228,15 @@ public class FetcherTest { for (Record record : records.records()) expectedBytes += record.sizeInBytes(); - fetchRecords(tp1, records, Errors.NONE, 100L, 0); + fetchRecords(tp0, records, Errors.NONE, 100L, 0); assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON); assertEquals(3, recordsCountAverage.value(), EPSILON); } @Test public void testFetchResponseMetricsPartialResponse() { - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 1); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 1); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg)); @@ -1254,16 +1254,16 @@ public class FetcherTest { expectedBytes += record.sizeInBytes(); } - fetchRecords(tp1, records, Errors.NONE, 100L, 0); + fetchRecords(tp0, records, Errors.NONE, 100L, 0); assertEquals(expectedBytes, fetchSizeAverage.value(), EPSILON); assertEquals(2, recordsCountAverage.value(), EPSILON); } @Test public void testFetchResponseMetricsWithOnePartitionError() { - subscriptions.assignFromUser(Utils.mkSet(tp1, tp2)); + subscriptions.assignFromUser(Utils.mkSet(tp0, tp1)); + subscriptions.seek(tp0, 0); subscriptions.seek(tp1, 0); - subscriptions.seek(tp2, 0); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg)); @@ -1276,9 +1276,9 @@ public class FetcherTest { MemoryRecords records = builder.build(); Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>(); - partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, + partitions.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records)); - partitions.put(tp2, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100, + partitions.put(tp1, new FetchResponse.PartitionData(Errors.OFFSET_OUT_OF_RANGE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, MemoryRecords.EMPTY)); assertEquals(1, fetcher.sendFetches()); @@ -1296,9 +1296,9 @@ public class FetcherTest { @Test public void testFetchResponseMetricsWithOnePartitionAtTheWrongOffset() { - subscriptions.assignFromUser(Utils.mkSet(tp1, tp2)); + subscriptions.assignFromUser(Utils.mkSet(tp0, tp1)); + subscriptions.seek(tp0, 0); subscriptions.seek(tp1, 0); - subscriptions.seek(tp2, 0); Map<MetricName, KafkaMetric> allMetrics = metrics.metrics(); KafkaMetric fetchSizeAverage = allMetrics.get(metrics.metricInstance(metricsRegistry.fetchSizeAvg)); @@ -1306,7 +1306,7 @@ public class FetcherTest { // send the fetch and then seek to a new offset assertEquals(1, fetcher.sendFetches()); - subscriptions.seek(tp2, 5); + subscriptions.seek(tp1, 5); MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 0L); @@ -1315,9 +1315,9 @@ public class FetcherTest { MemoryRecords records = builder.build(); Map<TopicPartition, FetchResponse.PartitionData> partitions = new HashMap<>(); - partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, + partitions.put(tp0, new FetchResponse.PartitionData(Errors.NONE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records)); - partitions.put(tp2, new FetchResponse.PartitionData(Errors.NONE, 100, + partitions.put(tp1, new FetchResponse.PartitionData(Errors.NONE, 100, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("val".getBytes())))); @@ -1361,6 +1361,8 @@ public class FetcherTest { public void testGetOffsetsForTimes() { // Empty map assertTrue(fetcher.getOffsetsByTimes(new HashMap<TopicPartition, Long>(), 100L).isEmpty()); + // Unknown Offset + testGetOffsetsForTimesWithUnknownOffset(); // Error code none with unknown offset testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L); // Error code none with known offset @@ -1379,15 +1381,15 @@ public class FetcherTest { @Test(expected = TimeoutException.class) public void testBatchedListOffsetsMetadataErrors() { Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); - partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION, + partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION, ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)); - partitionData.put(tp2, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, + partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)); client.prepareResponse(new ListOffsetResponse(0, partitionData)); Map<TopicPartition, Long> offsetsToSearch = new HashMap<>(); + offsetsToSearch.put(tp0, ListOffsetRequest.EARLIEST_TIMESTAMP); offsetsToSearch.put(tp1, ListOffsetRequest.EARLIEST_TIMESTAMP); - offsetsToSearch.put(tp2, ListOffsetRequest.EARLIEST_TIMESTAMP); fetcher.getOffsetsByTimes(offsetsToSearch, 0); } @@ -1410,9 +1412,9 @@ public class FetcherTest { List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0)); MemoryRecords records = MemoryRecords.readableRecords(buffer); - subscriptions.assignFromUser(singleton(tp1)); + subscriptions.assignFromUser(singleton(tp0)); - subscriptions.seek(tp1, 0); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); @@ -1423,7 +1425,7 @@ public class FetcherTest { assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); - assertFalse(fetchedRecords.containsKey(tp1)); + assertFalse(fetchedRecords.containsKey(tp0)); } @Test @@ -1442,9 +1444,9 @@ public class FetcherTest { List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); MemoryRecords records = MemoryRecords.readableRecords(buffer); - subscriptions.assignFromUser(singleton(tp1)); + subscriptions.assignFromUser(singleton(tp0)); - subscriptions.seek(tp1, 0); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); @@ -1462,8 +1464,8 @@ public class FetcherTest { assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); - assertTrue(fetchedRecords.containsKey(tp1)); - assertEquals(fetchedRecords.get(tp1).size(), 2); + assertTrue(fetchedRecords.containsKey(tp0)); + assertEquals(fetchedRecords.get(tp0).size(), 2); } @Test @@ -1519,9 +1521,9 @@ public class FetcherTest { buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); - subscriptions.assignFromUser(singleton(tp1)); + subscriptions.assignFromUser(singleton(tp0)); - subscriptions.seek(tp1, 0); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); @@ -1532,9 +1534,9 @@ public class FetcherTest { assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); - assertTrue(fetchedRecords.containsKey(tp1)); + assertTrue(fetchedRecords.containsKey(tp0)); // There are only 3 committed records - List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp1); + List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp0); Set<String> fetchedKeys = new HashSet<>(); for (ConsumerRecord<byte[], byte[]> consumerRecord : fetchedConsumerRecords) { fetchedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8)); @@ -1566,9 +1568,9 @@ public class FetcherTest { List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0)); MemoryRecords records = MemoryRecords.readableRecords(buffer); - subscriptions.assignFromUser(singleton(tp1)); + subscriptions.assignFromUser(singleton(tp0)); - subscriptions.seek(tp1, 0); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); @@ -1579,9 +1581,9 @@ public class FetcherTest { assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); - assertTrue(fetchedRecords.containsKey(tp1)); - assertEquals(fetchedRecords.get(tp1).size(), 2); - List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp1); + assertTrue(fetchedRecords.containsKey(tp0)); + assertEquals(fetchedRecords.get(tp0).size(), 2); + List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp0); Set<String> committedKeys = new HashSet<>(Arrays.asList("commit1-1", "commit1-2")); Set<String> actuallyCommittedKeys = new HashSet<>(); for (ConsumerRecord<byte[], byte[]> consumerRecord : fetchedConsumerRecords) { @@ -1610,8 +1612,8 @@ public class FetcherTest { buffer.flip(); // send the fetch - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); // prepare the response. the aborted transactions begin at offsets which are no longer in the log @@ -1624,8 +1626,8 @@ public class FetcherTest { assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<String, String>>> allFetchedRecords = fetcher.fetchedRecords(); - assertTrue(allFetchedRecords.containsKey(tp1)); - List<ConsumerRecord<String, String>> fetchedRecords = allFetchedRecords.get(tp1); + assertTrue(allFetchedRecords.containsKey(tp0)); + List<ConsumerRecord<String, String>> fetchedRecords = allFetchedRecords.get(tp0); assertEquals(3, fetchedRecords.size()); assertEquals(Arrays.asList(6L, 7L, 8L), collectRecordOffsets(fetchedRecords)); } @@ -1639,7 +1641,7 @@ public class FetcherTest { new SimpleRecord(null, "value".getBytes())); // Remove the last record to simulate compaction - MemoryRecords.FilterResult result = records.filterTo(tp1, new MemoryRecords.RecordFilter() { + MemoryRecords.FilterResult result = records.filterTo(tp0, new MemoryRecords.RecordFilter() { @Override protected BatchRetention checkBatchRetention(RecordBatch batch) { return BatchRetention.DELETE_EMPTY; @@ -1653,16 +1655,16 @@ public class FetcherTest { result.output.flip(); MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.output); - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, compactedRecords, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, compactedRecords, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> allFetchedRecords = fetcher.fetchedRecords(); - assertTrue(allFetchedRecords.containsKey(tp1)); - List<ConsumerRecord<byte[], byte[]>> fetchedRecords = allFetchedRecords.get(tp1); + assertTrue(allFetchedRecords.containsKey(tp0)); + List<ConsumerRecord<byte[], byte[]>> fetchedRecords = allFetchedRecords.get(tp0); assertEquals(3, fetchedRecords.size()); for (int i = 0; i < 3; i++) { @@ -1670,7 +1672,7 @@ public class FetcherTest { } // The next offset should point to the next batch - assertEquals(4L, subscriptions.position(tp1).longValue()); + assertEquals(4L, subscriptions.position(tp0).longValue()); } @Test @@ -1688,10 +1690,10 @@ public class FetcherTest { buffer.flip(); MemoryRecords recordsWithEmptyBatch = MemoryRecords.readableRecords(buffer); - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); - client.prepareResponse(fetchResponse(tp1, recordsWithEmptyBatch, Errors.NONE, 100L, 0)); + client.prepareResponse(fetchResponse(tp0, recordsWithEmptyBatch, Errors.NONE, 100L, 0)); consumerClient.poll(0); assertTrue(fetcher.hasCompletedFetches()); @@ -1699,7 +1701,7 @@ public class FetcherTest { assertTrue(allFetchedRecords.isEmpty()); // The next offset should point to the next batch - assertEquals(lastOffset + 1, subscriptions.position(tp1).longValue()); + assertEquals(lastOffset + 1, subscriptions.position(tp0).longValue()); } @Test @@ -1743,8 +1745,8 @@ public class FetcherTest { buffer.flip(); // send the fetch - subscriptions.assignFromUser(singleton(tp1)); - subscriptions.seek(tp1, 0); + subscriptions.assignFromUser(singleton(tp0)); + subscriptions.seek(tp0, 0); assertEquals(1, fetcher.sendFetches()); // prepare the response. the aborted transactions begin at offsets which are no longer in the log @@ -1758,8 +1760,8 @@ public class FetcherTest { assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<String, String>>> allFetchedRecords = fetcher.fetchedRecords(); - assertTrue(allFetchedRecords.containsKey(tp1)); - List<ConsumerRecord<String, String>> fetchedRecords = allFetchedRecords.get(tp1); + assertTrue(allFetchedRecords.containsKey(tp0)); + List<ConsumerRecord<String, String>> fetchedRecords = allFetchedRecords.get(tp0); assertEquals(5, fetchedRecords.size()); assertEquals(Arrays.asList(3L, 4L, 30L, 31L, 32L), collectRecordOffsets(fetchedRecords)); } @@ -1782,9 +1784,9 @@ public class FetcherTest { List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0)); MemoryRecords records = MemoryRecords.readableRecords(buffer); - subscriptions.assignFromUser(singleton(tp1)); + subscriptions.assignFromUser(singleton(tp0)); - subscriptions.seek(tp1, 0); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); @@ -1795,7 +1797,7 @@ public class FetcherTest { assertTrue(fetcher.hasCompletedFetches()); Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); - assertTrue(fetchedRecords.containsKey(tp1)); + assertTrue(fetchedRecords.containsKey(tp0)); } @Test @@ -1815,9 +1817,9 @@ public class FetcherTest { List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>(); abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 0)); MemoryRecords records = MemoryRecords.readableRecords(buffer); - subscriptions.assignFromUser(singleton(tp1)); + subscriptions.assignFromUser(singleton(tp0)); - subscriptions.seek(tp1, 0); + subscriptions.seek(tp0, 0); // normal fetch assertEquals(1, fetcher.sendFetches()); @@ -1830,8 +1832,8 @@ public class FetcherTest { Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchedRecords = fetcher.fetchedRecords(); // Ensure that we don't return any of the aborted records, but yet advance the consumer position. - assertFalse(fetchedRecords.containsKey(tp1)); - assertEquals(currentOffset, (long) subscriptions.position(tp1)); + assertFalse(fetchedRecords.containsKey(tp0)); + assertEquals(currentOffset, (long) subscriptions.position(tp0)); } private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, int baseSequence, SimpleRecord... records) { @@ -1873,8 +1875,6 @@ public class FetcherTest { Long expectedOffsetForTp0, Long expectedOffsetForTp1) { client.reset(); - TopicPartition tp0 = tp1; - TopicPartition tp1 = new TopicPartition(topicName, 1); // Ensure metadata has both partition. Cluster cluster = TestUtils.clusterWith(2, topicName, 2); metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); @@ -1906,19 +1906,39 @@ public class FetcherTest { } } + private void testGetOffsetsForTimesWithUnknownOffset() { + client.reset(); + // Ensure metadata has both partition. + Cluster cluster = TestUtils.clusterWith(1, topicName, 1); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); + + Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); + partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NONE, + ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)); + + client.prepareResponseFrom(new ListOffsetResponse(0, partitionData), cluster.leaderFor(tp0)); + + Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); + timestampToSearch.put(tp0, 0L); + Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE); + + assertTrue(offsetAndTimestampMap.containsKey(tp0)); + assertNull(offsetAndTimestampMap.get(tp0)); + } + private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) { // matches any list offset request with the provided timestamp return new MockClient.RequestMatcher() { @Override public boolean matches(AbstractRequest body) { ListOffsetRequest req = (ListOffsetRequest) body; - return timestamp == req.partitionTimestamps().get(tp1); + return timestamp == req.partitionTimestamps().get(tp0); } }; } private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) { - return listOffsetResponse(tp1, error, timestamp, offset); + return listOffsetResponse(tp0, error, timestamp, offset); } private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) { @@ -1934,7 +1954,7 @@ public class FetcherTest { long lastStableOffset, long hw, int throttleTime) { - Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp1, + Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp0, new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, abortedTransactions, records)); return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime); } http://git-wip-us.apache.org/repos/asf/kafka/blob/5bb53e03/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 4c1f593..77f11df 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -480,7 +480,7 @@ object ConsumerGroupCommand extends Logging { val consumer = getConsumer() consumer.assign(List(topicPartition).asJava) val offsetsForTimes = consumer.offsetsForTimes(Map(topicPartition -> timestamp).asJava) - if (offsetsForTimes != null && !offsetsForTimes.isEmpty) + if (offsetsForTimes != null && !offsetsForTimes.isEmpty && offsetsForTimes.get(topicPartition) != null) LogOffsetResult.LogOffset(offsetsForTimes.get(topicPartition).offset) else { getLogEndOffset(topicPartition) http://git-wip-us.apache.org/repos/asf/kafka/blob/5bb53e03/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala index ddac764..c9c40a9 100644 --- a/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala +++ b/core/src/test/scala/integration/kafka/api/LegacyAdminClientTest.scala @@ -158,6 +158,12 @@ class LegacyAdminClientTest extends IntegrationTestHarness with Logging { } @Test + def testOffsetsForTimesWhenOffsetNotFound() { + val consumer = consumers.head + assertNull(consumer.offsetsForTimes(Map(tp -> new JLong(0L)).asJava).get(tp)) + } + + @Test def testOffsetsForTimesAfterDeleteRecords() { val consumer = consumers.head subscribeAndWaitForAssignment(topic, consumer) http://git-wip-us.apache.org/repos/asf/kafka/blob/5bb53e03/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index bfe5419..e61f6c7 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -28,6 +28,8 @@ To upgrade from earlier versions, please review the <a href="#upgrade_11_0_0">0. <ul> <li>Topic deletion is now enabled by default, since the functionality is now stable. Users who wish to to retain the previous behavior should set the broker config <code>delete.topic.enable</code> to <code>false</code>. Keep in mind that topic deletion removes data and the operation is not reversible (i.e. there is no "undelete" operation)</li> + <li>For topics that support timestamp search if no offset can be found for a partition, that partition is now included in the search result with a null offset value. Previously, the partition was not included in the map. + This change was made to make the search behavior consistent with the case of topics not supporting timestamp search. </ul> <h4><a id="upgrade_11_0_0" href="#upgrade_11_0_0">Upgrading from 0.8.x, 0.9.x, 0.10.0.x, 0.10.1.x or 0.10.2.x to 0.11.0.0</a></h4>
