This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.0 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit abf45d185d5848e017b8aadff4cbda133421d1f8 Author: Anna Povzner <[email protected]> AuthorDate: Tue Sep 11 10:10:42 2018 -0700 KAFKA-7044; Fix Fetcher.fetchOffsetsByTimes and NPE in describe consumer group (#5627) A call to `kafka-consumer-groups --describe --group ...` can result in NullPointerException for two reasons: 1) `Fetcher.fetchOffsetsByTimes()` may return too early, without sending list offsets request for topic partitions that are not in cached metadata. 2) `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()` assumed that endOffsets()/beginningOffsets() which eventually call Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions passed to endOffsets()/beginningOffsets() and that values are not null. Because of (1), null values were possible if some of the topic partitions were already known (in metadata cache) and some not (metadata cache did not have entries for some of the topic partitions). [...] Testing: -- added unit test to verify fix in Fetcher.fetchOffsetsByTimes() -- did some manual testing with `kafka-consumer-groups --describe`, causing NPE. Was not able to reproduce any NPE cases with DescribeConsumerGroupTest.scala, Reviewers: Jason Gustafson <[email protected]> --- checkstyle/suppressions.xml | 2 +- .../kafka/clients/consumer/internals/Fetcher.java | 28 ++++++++++---- .../clients/consumer/internals/FetcherTest.java | 43 ++++++++++++++++++++++ .../scala/kafka/admin/ConsumerGroupCommand.scala | 20 +++++----- 4 files changed, 75 insertions(+), 18 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index e80d5bf..8099324 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -70,7 +70,7 @@ files="MockAdminClient.java"/> <suppress checks="JavaNCSS" - files="RequestResponseTest.java"/> + files="RequestResponseTest.java|FetcherTest.java"/> <suppress checks="NPathComplexity" files="MemoryRecordsTest|MetricsTest"/> diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index dd412ab..d1ec117 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -422,7 +422,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { if (value.partitionsToRetry.isEmpty()) return result; - remainingToSearch.keySet().removeAll(result.fetchedOffsets.keySet()); + remainingToSearch.keySet().retainAll(value.partitionsToRetry); } else if (!future.isRetriable()) { throw future.exception(); } @@ -590,7 +590,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { for (TopicPartition tp : partitionResetTimestamps.keySet()) metadata.add(tp.topic()); - Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = groupListOffsetRequests(partitionResetTimestamps); + Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = + groupListOffsetRequests(partitionResetTimestamps, new HashSet<>()); for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) { Node node = entry.getKey(); final Map<TopicPartition, Long> resetTimestamps = entry.getValue(); @@ -639,18 +640,19 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { for (TopicPartition tp : timestampsToSearch.keySet()) metadata.add(tp.topic()); - Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = groupListOffsetRequests(timestampsToSearch); + final Set<TopicPartition> partitionsToRetry = new HashSet<>(); + Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = + groupListOffsetRequests(timestampsToSearch, partitionsToRetry); if (timestampsToSearchByNode.isEmpty()) return RequestFuture.failure(new StaleMetadataException()); final RequestFuture<ListOffsetResult> listOffsetRequestsFuture = new RequestFuture<>(); final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>(); - final Set<TopicPartition> partitionsToRetry = new HashSet<>(); final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size()); for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) { RequestFuture<ListOffsetResult> future = - sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps); + sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps); future.addListener(new RequestFutureListener<ListOffsetResult>() { @Override public void onSuccess(ListOffsetResult partialResult) { @@ -677,7 +679,16 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { return listOffsetRequestsFuture; } - private Map<Node, Map<TopicPartition, Long>> groupListOffsetRequests(Map<TopicPartition, Long> timestampsToSearch) { + /** + * Groups timestamps to search by node for topic partitions in `timestampsToSearch` that have + * leaders available. Topic partitions from `timestampsToSearch` that do not have their leader + * available are added to `partitionsToRetry` + * @param timestampsToSearch The mapping from partitions ot the target timestamps + * @param partitionsToRetry A set of topic partitions that will be extended with partitions + * that need metadata update or re-connect to the leader. + */ + private Map<Node, Map<TopicPartition, Long>> groupListOffsetRequests( + Map<TopicPartition, Long> timestampsToSearch, Set<TopicPartition> partitionsToRetry) { final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>(); for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) { TopicPartition tp = entry.getKey(); @@ -686,9 +697,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { metadata.add(tp.topic()); log.debug("Leader for partition {} is unknown for fetching offset", tp); metadata.requestUpdate(); + partitionsToRetry.add(tp); } else if (info.leader() == null) { log.debug("Leader for partition {} is unavailable for fetching offset", tp); metadata.requestUpdate(); + partitionsToRetry.add(tp); } else if (client.isUnavailable(info.leader())) { client.maybeThrowAuthFailure(info.leader()); @@ -696,7 +709,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable { // try again. No need to request a metadata update since the disconnect will have // done so already. log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires", - info.leader(), tp); + info.leader(), tp); + partitionsToRetry.add(tp); } else { Node node = info.leader(); Map<TopicPartition, Long> topicData = timestampsToSearchByNode.get(node); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index a734b3e..b67d48e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -108,6 +108,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -1838,6 +1839,48 @@ public class FetcherTest { testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L); } + @Test + public void testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() { + final String anotherTopic = "another-topic"; + final TopicPartition t2p0 = new TopicPartition(anotherTopic, 0); + + client.reset(); + + // Metadata initially has one topic + Cluster cluster = TestUtils.clusterWith(3, topicName, 2); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); + + // The first metadata refresh should contain one topic + client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(), false); + client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L, 11L), cluster.leaderFor(tp0)); + client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L, 32L), cluster.leaderFor(tp1)); + + // Second metadata refresh should contain two topics + Map<String, Integer> partitionNumByTopic = new HashMap<>(); + partitionNumByTopic.put(topicName, 2); + partitionNumByTopic.put(anotherTopic, 1); + Cluster updatedCluster = TestUtils.clusterWith(3, partitionNumByTopic); + client.prepareMetadataUpdate(updatedCluster, Collections.<String>emptySet(), false); + client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE, 1000L, 54L), cluster.leaderFor(t2p0)); + + Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); + timestampToSearch.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP); + timestampToSearch.put(tp1, ListOffsetRequest.LATEST_TIMESTAMP); + timestampToSearch.put(t2p0, ListOffsetRequest.LATEST_TIMESTAMP); + Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap = + fetcher.offsetsByTimes(timestampToSearch, Long.MAX_VALUE); + + assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null result for " + tp0, + offsetAndTimestampMap.get(tp0)); + assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null result for " + tp1, + offsetAndTimestampMap.get(tp1)); + assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null result for " + t2p0, + offsetAndTimestampMap.get(t2p0)); + assertEquals(11L, offsetAndTimestampMap.get(tp0).offset()); + assertEquals(32L, offsetAndTimestampMap.get(tp1).offset()); + assertEquals(54L, offsetAndTimestampMap.get(t2p0).offset()); + } + @Test(expected = TimeoutException.class) public void testBatchedListOffsetsMetadataErrors() { Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>(); diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index d5a57ee..2b5da4f 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -288,12 +288,8 @@ object ConsumerGroupCommand extends Logging { } getLogEndOffsets(topicPartitions).map { - logEndOffsetResult => - logEndOffsetResult._2 match { - case LogOffsetResult.LogOffset(logEndOffset) => getDescribePartitionResult(logEndOffsetResult._1, Some(logEndOffset)) - case LogOffsetResult.Unknown => getDescribePartitionResult(logEndOffsetResult._1, None) - case LogOffsetResult.Ignore => null - } + case (topicPartition, LogOffsetResult.LogOffset(offset)) => getDescribePartitionResult(topicPartition, Some(offset)) + case (topicPartition, _) => getDescribePartitionResult(topicPartition, None) }.toArray } @@ -353,16 +349,20 @@ object ConsumerGroupCommand extends Logging { private def getLogEndOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = { val offsets = getConsumer.endOffsets(topicPartitions.asJava) topicPartitions.map { topicPartition => - val logEndOffset = offsets.get(topicPartition) - topicPartition -> LogOffsetResult.LogOffset(logEndOffset) + Option(offsets.get(topicPartition)) match { + case Some(logEndOffset) => topicPartition -> LogOffsetResult.LogOffset(logEndOffset) + case _ => topicPartition -> LogOffsetResult.Unknown + } }.toMap } private def getLogStartOffsets(topicPartitions: Seq[TopicPartition]): Map[TopicPartition, LogOffsetResult] = { val offsets = getConsumer.beginningOffsets(topicPartitions.asJava) topicPartitions.map { topicPartition => - val logStartOffset = offsets.get(topicPartition) - topicPartition -> LogOffsetResult.LogOffset(logStartOffset) + Option(offsets.get(topicPartition)) match { + case Some(logStartOffset) => topicPartition -> LogOffsetResult.LogOffset(logStartOffset) + case _ => topicPartition -> LogOffsetResult.Unknown + } }.toMap }
