Repository: kafka
Updated Branches:
  refs/heads/0.11.0 c63e9baeb -> ce6e638bd


KAFKA-5349; Fix illegal state error in consumer's ListOffset handler

Author: Jason Gustafson <[email protected]>

Reviewers: Apurva Mehta <[email protected]>, Ismael Juma <[email protected]>

Closes #3175 from hachikuji/KAFKA-5349

(cherry picked from commit aebba89a2b9b5ea6a7cab2599555232ef3fe21ad)
Signed-off-by: Ismael Juma <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ce6e638b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ce6e638b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ce6e638b

Branch: refs/heads/0.11.0
Commit: ce6e638bd9d3fd0d3df18d4439a7dae7ec492c44
Parents: c63e9ba
Author: Jason Gustafson <[email protected]>
Authored: Wed May 31 10:23:39 2017 +0100
Committer: Ismael Juma <[email protected]>
Committed: Wed May 31 10:28:41 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/internals/Fetcher.java | 10 ++++++++--
 .../clients/consumer/internals/FetcherTest.java   | 18 +++++++++++++++++-
 2 files changed, 25 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ce6e638b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e3f2355..c2beff8 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -680,7 +680,10 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
      * Callback for the response of the list offset call above.
      * @param timestampsToSearch The mapping from partitions to target 
timestamps
      * @param listOffsetResponse The response from the server.
-     * @param future The future to be completed by the response.
+     * @param future The future to be completed when the response returns. 
Note that any partition-level errors will
+     *               generally fail the entire future result. The one 
exception is UNSUPPORTED_FOR_MESSAGE_FORMAT,
+     *               which indicates that the broker does not support the v1 
message format and results in a null
+     *               being inserted into the resulting map.
      */
     @SuppressWarnings("deprecation")
     private void handleListOffsetResponse(Map<TopicPartition, Long> 
timestampsToSearch,
@@ -728,13 +731,16 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
                 log.debug("Attempt to fetch offsets for partition {} failed 
due to obsolete leadership information, retrying.",
                         topicPartition);
                 future.raise(error);
+                return;
             } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                 log.warn("Received unknown topic or partition error in 
ListOffset request for partition {}. The topic/partition " +
-                        "may not exist or the user may not have Describe 
access to it", topicPartition);
+                        "may not exist or the user may not have Describe 
access to it.", topicPartition);
                 future.raise(error);
+                return;
             } else {
                 log.warn("Attempt to fetch offsets for partition {} failed due 
to: {}", topicPartition, error.message());
                 future.raise(new StaleMetadataException());
+                return;
             }
         }
         if (!future.isDone())

http://git-wip-us.apache.org/repos/asf/kafka/blob/ce6e638b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index fedec2a..7d48623 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -1332,6 +1332,22 @@ public class FetcherTest {
         testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, 
Errors.NONE, 10L, 100L, 10L, 100L);
     }
 
+    @Test(expected = TimeoutException.class)
+    public void testBatchedListOffsetsMetadataErrors() {
+        Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = 
new HashMap<>();
+        partitionData.put(tp1, new 
ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION,
+                ListOffsetResponse.UNKNOWN_TIMESTAMP, 
ListOffsetResponse.UNKNOWN_OFFSET));
+        partitionData.put(tp2, new 
ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
+                ListOffsetResponse.UNKNOWN_TIMESTAMP, 
ListOffsetResponse.UNKNOWN_OFFSET));
+        client.prepareResponse(new ListOffsetResponse(0, partitionData));
+
+        Map<TopicPartition, Long> offsetsToSearch = new HashMap<>();
+        offsetsToSearch.put(tp1, ListOffsetRequest.EARLIEST_TIMESTAMP);
+        offsetsToSearch.put(tp2, ListOffsetRequest.EARLIEST_TIMESTAMP);
+
+        fetcher.getOffsetsByTimes(offsetsToSearch, 0);
+    }
+
     @Test
     public void testSkippingAbortedTransactions() {
         Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new 
Metrics(), new ByteArrayDeserializer(),
@@ -1343,7 +1359,7 @@ public class FetcherTest {
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()),
                 new SimpleRecord(time.milliseconds(), "key".getBytes(), 
"value".getBytes()));
 
-        currentOffset += abortTransaction(buffer, 1L, currentOffset);
+        abortTransaction(buffer, 1L, currentOffset);
 
         buffer.flip();
 

Reply via email to