This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.1 by this push:
     new f263773  KAFKA-7044; Fix Fetcher.fetchOffsetsByTimes and NPE in 
describe consumer group (#5627)
f263773 is described below

commit f26377352d14af38af5d6cf42531b940fafe7236
Author: Anna Povzner <[email protected]>
AuthorDate: Tue Sep 11 10:10:42 2018 -0700

    KAFKA-7044; Fix Fetcher.fetchOffsetsByTimes and NPE in describe consumer 
group (#5627)
    
    A call to `kafka-consumer-groups --describe --group ...` can result in 
NullPointerException for two reasons:
    1)  `Fetcher.fetchOffsetsByTimes()` may return too early, without sending 
list offsets request for topic partitions that are not in cached metadata.
    2) `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()` 
assumed that endOffsets()/beginningOffsets() which eventually call 
Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions 
passed to endOffsets()/beginningOffsets() and that values are not null. Because 
of (1), null values were possible if some of the topic partitions were already 
known (in metadata cache) and some not (metadata cache did not have entries for 
some of the topic partitions).  [...]
    
    Testing:
    -- added unit test to verify fix in Fetcher.fetchOffsetsByTimes()
    -- did some manual testing with `kafka-consumer-groups --describe`, causing 
NPE. Was not able to reproduce any NPE cases with 
DescribeConsumerGroupTest.scala,
    
    Reviewers: Jason Gustafson <[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../kafka/clients/consumer/internals/Fetcher.java  | 25 ++++++++++---
 .../clients/consumer/internals/FetcherTest.java    | 43 ++++++++++++++++++++++
 .../scala/kafka/admin/ConsumerGroupCommand.scala   | 20 +++++-----
 4 files changed, 73 insertions(+), 17 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index a5e94a3..b87d1dd 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -70,7 +70,7 @@
               files="MockAdminClient.java"/>
 
     <suppress checks="JavaNCSS"
-              files="RequestResponseTest.java"/>
+              files="RequestResponseTest.java|FetcherTest.java"/>
 
     <suppress checks="NPathComplexity"
               files="MemoryRecordsTest|MetricsTest"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e81195e..e04e9fe 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -419,7 +419,7 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
                 if (value.partitionsToRetry.isEmpty())
                     return result;
 
-                
remainingToSearch.keySet().removeAll(result.fetchedOffsets.keySet());
+                remainingToSearch.keySet().retainAll(value.partitionsToRetry);
             } else if (!future.isRetriable()) {
                 throw future.exception();
             }
@@ -568,7 +568,8 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
         for (TopicPartition tp : partitionResetTimestamps.keySet())
             metadata.add(tp.topic());
 
-        Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = 
groupListOffsetRequests(partitionResetTimestamps);
+        Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode =
+                groupListOffsetRequests(partitionResetTimestamps, new 
HashSet<TopicPartition>());
         for (Map.Entry<Node, Map<TopicPartition, Long>> entry : 
timestampsToSearchByNode.entrySet()) {
             final Map<TopicPartition, Long> resetTimestamps = entry.getValue();
             subscriptions.setResetPending(resetTimestamps.keySet(), 
time.milliseconds() + requestTimeoutMs);
@@ -616,18 +617,19 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
         for (TopicPartition tp : timestampsToSearch.keySet())
             metadata.add(tp.topic());
 
-        Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = 
groupListOffsetRequests(timestampsToSearch);
+        final Set<TopicPartition> partitionsToRetry = new HashSet<>();
+        Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode =
+                groupListOffsetRequests(timestampsToSearch, partitionsToRetry);
         if (timestampsToSearchByNode.isEmpty())
             return RequestFuture.failure(new StaleMetadataException());
 
         final RequestFuture<ListOffsetResult> listOffsetRequestsFuture = new 
RequestFuture<>();
         final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new 
HashMap<>();
-        final Set<TopicPartition> partitionsToRetry = new HashSet<>();
         final AtomicInteger remainingResponses = new 
AtomicInteger(timestampsToSearchByNode.size());
 
         for (Map.Entry<Node, Map<TopicPartition, Long>> entry : 
timestampsToSearchByNode.entrySet()) {
             RequestFuture<ListOffsetResult> future =
-                    sendListOffsetRequest(entry.getKey(), entry.getValue(), 
requireTimestamps);
+                sendListOffsetRequest(entry.getKey(), entry.getValue(), 
requireTimestamps);
             future.addListener(new RequestFutureListener<ListOffsetResult>() {
                 @Override
                 public void onSuccess(ListOffsetResult partialResult) {
@@ -654,7 +656,16 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
         return listOffsetRequestsFuture;
     }
 
-    private Map<Node, Map<TopicPartition, Long>> 
groupListOffsetRequests(Map<TopicPartition, Long> timestampsToSearch) {
+        /**
+         * Groups timestamps to search by node for topic partitions in 
`timestampsToSearch` that have
+         * leaders available. Topic partitions from `timestampsToSearch` that 
do not have their leader
+         * available are added to `partitionsToRetry`
+         * @param timestampsToSearch The mapping from partitions ot the target 
timestamps
+         * @param partitionsToRetry A set of topic partitions that will be 
extended with partitions
+         *                          that need metadata update or re-connect to 
the leader.
+         */
+    private Map<Node, Map<TopicPartition, Long>> groupListOffsetRequests(
+            Map<TopicPartition, Long> timestampsToSearch, Set<TopicPartition> 
partitionsToRetry) {
         final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = 
new HashMap<>();
         for (Map.Entry<TopicPartition, Long> entry: 
timestampsToSearch.entrySet()) {
             TopicPartition tp  = entry.getKey();
@@ -663,9 +674,11 @@ public class Fetcher<K, V> implements 
SubscriptionState.Listener, Closeable {
                 metadata.add(tp.topic());
                 log.debug("Partition {} is unknown for fetching offset", tp);
                 metadata.requestUpdate();
+                partitionsToRetry.add(tp);
             } else if (info.leader() == null) {
                 log.debug("Leader for partition {} unavailable for fetching 
offset", tp);
                 metadata.requestUpdate();
+                partitionsToRetry.add(tp);
             } else {
                 Node node = info.leader();
                 Map<TopicPartition, Long> topicData = 
timestampsToSearchByNode.get(node);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 832b127..23b67f4 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -106,6 +106,7 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -1633,6 +1634,48 @@ public class FetcherTest {
         testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, 
Errors.NONE, 10L, 100L, 10L, 100L);
     }
 
+    @Test
+    public void 
testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() {
+        final String anotherTopic = "another-topic";
+        final TopicPartition t2p0 = new TopicPartition(anotherTopic, 0);
+
+        client.reset();
+
+        // Metadata initially has one topic
+        Cluster cluster = TestUtils.clusterWith(3, topicName, 2);
+        metadata.update(cluster, Collections.<String>emptySet(), 
time.milliseconds());
+
+        // The first metadata refresh should contain one topic
+        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), 
false);
+        client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 
11L), cluster.leaderFor(tp0));
+        client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L, 
32L), cluster.leaderFor(tp1));
+
+        // Second metadata refresh should contain two topics
+        Map<String, Integer> partitionNumByTopic = new HashMap<>();
+        partitionNumByTopic.put(topicName, 2);
+        partitionNumByTopic.put(anotherTopic, 1);
+        Cluster updatedCluster = TestUtils.clusterWith(3, partitionNumByTopic);
+        client.prepareMetadataUpdate(updatedCluster, 
Collections.<String>emptySet(), false);
+        client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, 
1000L, 54L), cluster.leaderFor(t2p0));
+
+        Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
+        timestampToSearch.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP);
+        timestampToSearch.put(tp1, ListOffsetRequest.LATEST_TIMESTAMP);
+        timestampToSearch.put(t2p0, ListOffsetRequest.LATEST_TIMESTAMP);
+        Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
+            fetcher.offsetsByTimes(timestampToSearch, Long.MAX_VALUE);
+
+        assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null 
result for " + tp0,
+                      offsetAndTimestampMap.get(tp0));
+        assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null 
result for " + tp1,
+                      offsetAndTimestampMap.get(tp1));
+        assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null 
result for " + t2p0,
+                      offsetAndTimestampMap.get(t2p0));
+        assertEquals(11L, offsetAndTimestampMap.get(tp0).offset());
+        assertEquals(32L, offsetAndTimestampMap.get(tp1).offset());
+        assertEquals(54L, offsetAndTimestampMap.get(t2p0).offset());
+    }
+
     @Test(expected = TimeoutException.class)
     public void testBatchedListOffsetsMetadataErrors() {
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = 
new HashMap<>();
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 6818631..8be0be6 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -326,12 +326,8 @@ object ConsumerGroupCommand extends Logging {
       }
 
       getLogEndOffsets(topicPartitions).map {
-        logEndOffsetResult =>
-          logEndOffsetResult._2 match {
-            case LogOffsetResult.LogOffset(logEndOffset) => 
getDescribePartitionResult(logEndOffsetResult._1, Some(logEndOffset))
-            case LogOffsetResult.Unknown => 
getDescribePartitionResult(logEndOffsetResult._1, None)
-            case LogOffsetResult.Ignore => null
-          }
+        case (topicPartition, LogOffsetResult.LogOffset(offset)) => 
getDescribePartitionResult(topicPartition, Some(offset))
+        case (topicPartition, _) => getDescribePartitionResult(topicPartition, 
None)
       }.toArray
     }
 
@@ -608,16 +604,20 @@ object ConsumerGroupCommand extends Logging {
     protected def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): 
Map[TopicPartition, LogOffsetResult] = {
       val offsets = getConsumer.endOffsets(topicPartitions.asJava)
       topicPartitions.map { topicPartition =>
-        val logEndOffset = offsets.get(topicPartition)
-        topicPartition -> LogOffsetResult.LogOffset(logEndOffset)
+        Option(offsets.get(topicPartition)) match {
+          case Some(logEndOffset) => topicPartition -> 
LogOffsetResult.LogOffset(logEndOffset)
+          case _ => topicPartition -> LogOffsetResult.Unknown
+        }
       }.toMap
     }
 
     protected def getLogStartOffsets(topicPartitions: Seq[TopicPartition]): 
Map[TopicPartition, LogOffsetResult] = {
       val offsets = getConsumer.beginningOffsets(topicPartitions.asJava)
       topicPartitions.map { topicPartition =>
-        val logStartOffset = offsets.get(topicPartition)
-        topicPartition -> LogOffsetResult.LogOffset(logStartOffset)
+        Option(offsets.get(topicPartition)) match {
+          case Some(logStartOffset) => topicPartition -> 
LogOffsetResult.LogOffset(logStartOffset)
+          case _ => topicPartition -> LogOffsetResult.Unknown
+        }
       }.toMap
     }
 

Reply via email to