Repository: kafka Updated Branches: refs/heads/0.11.0 c63e9baeb -> ce6e638bd
KAFKA-5349; Fix illegal state error in consumer's ListOffset handler Author: Jason Gustafson <[email protected]> Reviewers: Apurva Mehta <[email protected]>, Ismael Juma <[email protected]> Closes #3175 from hachikuji/KAFKA-5349 (cherry picked from commit aebba89a2b9b5ea6a7cab2599555232ef3fe21ad) Signed-off-by: Ismael Juma <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ce6e638b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ce6e638b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ce6e638b Branch: refs/heads/0.11.0 Commit: ce6e638bd9d3fd0d3df18d4439a7dae7ec492c44 Parents: c63e9ba Author: Jason Gustafson <[email protected]> Authored: Wed May 31 10:23:39 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed May 31 10:28:41 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/consumer/internals/Fetcher.java | 10 ++++++++-- .../clients/consumer/internals/FetcherTest.java | 18 +++++++++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/ce6e638b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index e3f2355..c2beff8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -680,7 +680,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { * Callback for the response of the list offset call above. * @param timestampsToSearch The mapping from partitions to target timestamps * @param listOffsetResponse The response from the server. - * @param future The future to be completed by the response. + * @param future The future to be completed when the response returns. Note that any partition-level errors will + * generally fail the entire future result. The one exception is UNSUPPORTED_FOR_MESSAGE_FORMAT, + * which indicates that the broker does not support the v1 message format and results in a null + * being inserted into the resulting map. */ @SuppressWarnings("deprecation") private void handleListOffsetResponse(Map<TopicPartition, Long> timestampsToSearch, @@ -728,13 +731,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", topicPartition); future.raise(error); + return; } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) { log.warn("Received unknown topic or partition error in ListOffset request for partition {}. The topic/partition " + - "may not exist or the user may not have Describe access to it", topicPartition); + "may not exist or the user may not have Describe access to it.", topicPartition); future.raise(error); + return; } else { log.warn("Attempt to fetch offsets for partition {} failed due to: {}", topicPartition, error.message()); future.raise(new StaleMetadataException()); + return; } } if (!future.isDone()) http://git-wip-us.apache.org/repos/asf/kafka/blob/ce6e638b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index fedec2a..7d48623 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -1332,6 +1332,22 @@ public class FetcherTest { testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L); } + @Test(expected = TimeoutException.class) + public void testBatchedListOffsetsMetadataErrors() { + Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); + partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION, + ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)); + partitionData.put(tp2, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION, + ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET)); + client.prepareResponse(new ListOffsetResponse(0, partitionData)); + + Map<TopicPartition, Long> offsetsToSearch = new HashMap<>(); + offsetsToSearch.put(tp1, ListOffsetRequest.EARLIEST_TIMESTAMP); + offsetsToSearch.put(tp2, ListOffsetRequest.EARLIEST_TIMESTAMP); + + fetcher.getOffsetsByTimes(offsetsToSearch, 0); + } + @Test public void testSkippingAbortedTransactions() { Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(), @@ -1343,7 +1359,7 @@ public class FetcherTest { new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()), new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes())); - currentOffset += abortTransaction(buffer, 1L, currentOffset); + abortTransaction(buffer, 1L, currentOffset); buffer.flip();
