This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e2ec2d7 KAFKA-7044; Fix Fetcher.fetchOffsetsByTimes and NPE in
describe consumer group (#5627)
e2ec2d7 is described below
commit e2ec2d79c8d5adefc0c764583cec47144dbc5705
Author: Anna Povzner <[email protected]>
AuthorDate: Tue Sep 11 10:10:42 2018 -0700
KAFKA-7044; Fix Fetcher.fetchOffsetsByTimes and NPE in describe consumer
group (#5627)
A call to `kafka-consumer-groups --describe --group ...` can result in
NullPointerException for two reasons:
1) `Fetcher.fetchOffsetsByTimes()` may return too early, without sending
list offsets request for topic partitions that are not in cached metadata.
2) `ConsumerGroupCommand.getLogEndOffsets()` and `getLogStartOffsets()`
assumed that endOffsets()/beginningOffsets() which eventually call
Fetcher.fetchOffsetsByTimes(), would return a map with all the topic partitions
passed to endOffsets()/beginningOffsets() and that values are not null. Because
of (1), null values were possible if some of the topic partitions were already
known (in metadata cache) and some not (metadata cache did not have entries for
some of the topic partitions). [...]
Testing:
-- added unit test to verify fix in Fetcher.fetchOffsetsByTimes()
-- did some manual testing with `kafka-consumer-groups --describe`, causing
NPE. Was not able to reproduce any NPE cases with
DescribeConsumerGroupTest.scala,
Reviewers: Jason Gustafson <[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../kafka/clients/consumer/internals/Fetcher.java | 25 +++++++++----
.../clients/consumer/internals/FetcherTest.java | 43 ++++++++++++++++++++++
.../scala/kafka/admin/ConsumerGroupCommand.scala | 20 +++++-----
4 files changed, 72 insertions(+), 18 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 3ed4a9c..2fa499c 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -70,7 +70,7 @@
files="MockAdminClient.java"/>
<suppress checks="JavaNCSS"
- files="RequestResponseTest.java"/>
+ files="RequestResponseTest.java|FetcherTest.java"/>
<suppress checks="NPathComplexity"
files="MemoryRecordsTest|MetricsTest"/>
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index a92f57e..c84dd6f 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -414,7 +414,7 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
if (value.partitionsToRetry.isEmpty())
return result;
-
remainingToSearch.keySet().removeAll(result.fetchedOffsets.keySet());
+ remainingToSearch.keySet().retainAll(value.partitionsToRetry);
} else if (!future.isRetriable()) {
throw future.exception();
}
@@ -575,7 +575,7 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
metadata.add(tp.topic());
Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>>
timestampsToSearchByNode =
- groupListOffsetRequests(partitionResetTimestamps);
+ groupListOffsetRequests(partitionResetTimestamps, new
HashSet<>());
for (Map.Entry<Node, Map<TopicPartition,
ListOffsetRequest.PartitionData>> entry : timestampsToSearchByNode.entrySet()) {
Node node = entry.getKey();
final Map<TopicPartition, ListOffsetRequest.PartitionData>
resetTimestamps = entry.getValue();
@@ -624,19 +624,19 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
for (TopicPartition tp : timestampsToSearch.keySet())
metadata.add(tp.topic());
+ final Set<TopicPartition> partitionsToRetry = new HashSet<>();
Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>>
timestampsToSearchByNode =
- groupListOffsetRequests(timestampsToSearch);
+ groupListOffsetRequests(timestampsToSearch, partitionsToRetry);
if (timestampsToSearchByNode.isEmpty())
return RequestFuture.failure(new StaleMetadataException());
final RequestFuture<ListOffsetResult> listOffsetRequestsFuture = new
RequestFuture<>();
final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new
HashMap<>();
- final Set<TopicPartition> partitionsToRetry = new HashSet<>();
final AtomicInteger remainingResponses = new
AtomicInteger(timestampsToSearchByNode.size());
for (Map.Entry<Node, Map<TopicPartition,
ListOffsetRequest.PartitionData>> entry : timestampsToSearchByNode.entrySet()) {
RequestFuture<ListOffsetResult> future =
- sendListOffsetRequest(entry.getKey(), entry.getValue(),
requireTimestamps);
+ sendListOffsetRequest(entry.getKey(), entry.getValue(),
requireTimestamps);
future.addListener(new RequestFutureListener<ListOffsetResult>() {
@Override
public void onSuccess(ListOffsetResult partialResult) {
@@ -663,8 +663,16 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
return listOffsetRequestsFuture;
}
+ /**
+ * Groups timestamps to search by node for topic partitions in
`timestampsToSearch` that have
+ * leaders available. Topic partitions from `timestampsToSearch` that do
not have their leader
+ * available are added to `partitionsToRetry`
+ * @param timestampsToSearch The mapping from partitions ot the target
timestamps
+ * @param partitionsToRetry A set of topic partitions that will be
extended with partitions
+ * that need metadata update or re-connect to the
leader.
+ */
private Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>>
groupListOffsetRequests(
- Map<TopicPartition, Long> timestampsToSearch) {
+ Map<TopicPartition, Long> timestampsToSearch, Set<TopicPartition>
partitionsToRetry) {
final Map<Node, Map<TopicPartition, ListOffsetRequest.PartitionData>>
timestampsToSearchByNode = new HashMap<>();
for (Map.Entry<TopicPartition, Long> entry:
timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
@@ -673,9 +681,11 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
metadata.add(tp.topic());
log.debug("Leader for partition {} is unknown for fetching
offset", tp);
metadata.requestUpdate();
+ partitionsToRetry.add(tp);
} else if (info.leader() == null) {
log.debug("Leader for partition {} is unavailable for fetching
offset", tp);
metadata.requestUpdate();
+ partitionsToRetry.add(tp);
} else if (client.isUnavailable(info.leader())) {
client.maybeThrowAuthFailure(info.leader());
@@ -683,7 +693,8 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
// try again. No need to request a metadata update since the
disconnect will have
// done so already.
log.debug("Leader {} for partition {} is unavailable for
fetching offset until reconnect backoff expires",
- info.leader(), tp);
+ info.leader(), tp);
+ partitionsToRetry.add(tp);
} else {
Node node = info.leader();
Map<TopicPartition, ListOffsetRequest.PartitionData> topicData
=
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index afe5b2f..3bf3deb 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -108,6 +108,7 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -1938,6 +1939,48 @@ public class FetcherTest {
testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE,
Errors.NONE, 10L, 100L, 10L, 100L);
}
+ @Test
+ public void
testGetOffsetsForTimesWhenSomeTopicPartitionLeadersNotKnownInitially() {
+ final String anotherTopic = "another-topic";
+ final TopicPartition t2p0 = new TopicPartition(anotherTopic, 0);
+
+ client.reset();
+
+ // Metadata initially has one topic
+ Cluster cluster = TestUtils.clusterWith(3, topicName, 2);
+ metadata.update(cluster, Collections.<String>emptySet(),
time.milliseconds());
+
+ // The first metadata refresh should contain one topic
+ client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(),
false);
+ client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE, 1000L,
11L), cluster.leaderFor(tp0));
+ client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE, 1000L,
32L), cluster.leaderFor(tp1));
+
+ // Second metadata refresh should contain two topics
+ Map<String, Integer> partitionNumByTopic = new HashMap<>();
+ partitionNumByTopic.put(topicName, 2);
+ partitionNumByTopic.put(anotherTopic, 1);
+ Cluster updatedCluster = TestUtils.clusterWith(3, partitionNumByTopic);
+ client.prepareMetadataUpdate(updatedCluster,
Collections.<String>emptySet(), false);
+ client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE,
1000L, 54L), cluster.leaderFor(t2p0));
+
+ Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
+ timestampToSearch.put(tp0, ListOffsetRequest.LATEST_TIMESTAMP);
+ timestampToSearch.put(tp1, ListOffsetRequest.LATEST_TIMESTAMP);
+ timestampToSearch.put(t2p0, ListOffsetRequest.LATEST_TIMESTAMP);
+ Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
+ fetcher.offsetsByTimes(timestampToSearch,
time.timer(Long.MAX_VALUE));
+
+ assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null
result for " + tp0,
+ offsetAndTimestampMap.get(tp0));
+ assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null
result for " + tp1,
+ offsetAndTimestampMap.get(tp1));
+ assertNotNull("Expect Fetcher.offsetsByTimes() to return non-null
result for " + t2p0,
+ offsetAndTimestampMap.get(t2p0));
+ assertEquals(11L, offsetAndTimestampMap.get(tp0).offset());
+ assertEquals(32L, offsetAndTimestampMap.get(tp1).offset());
+ assertEquals(54L, offsetAndTimestampMap.get(t2p0).offset());
+ }
+
@Test(expected = TimeoutException.class)
public void testBatchedListOffsetsMetadataErrors() {
Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData =
new HashMap<>();
diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 1d61720..c0f6797 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -292,12 +292,8 @@ object ConsumerGroupCommand extends Logging {
}
getLogEndOffsets(topicPartitions).map {
- logEndOffsetResult =>
- logEndOffsetResult._2 match {
- case LogOffsetResult.LogOffset(logEndOffset) =>
getDescribePartitionResult(logEndOffsetResult._1, Some(logEndOffset))
- case LogOffsetResult.Unknown =>
getDescribePartitionResult(logEndOffsetResult._1, None)
- case LogOffsetResult.Ignore => null
- }
+ case (topicPartition, LogOffsetResult.LogOffset(offset)) =>
getDescribePartitionResult(topicPartition, Some(offset))
+ case (topicPartition, _) => getDescribePartitionResult(topicPartition,
None)
}.toArray
}
@@ -399,16 +395,20 @@ object ConsumerGroupCommand extends Logging {
private def getLogEndOffsets(topicPartitions: Seq[TopicPartition]):
Map[TopicPartition, LogOffsetResult] = {
val offsets = getConsumer.endOffsets(topicPartitions.asJava)
topicPartitions.map { topicPartition =>
- val logEndOffset = offsets.get(topicPartition)
- topicPartition -> LogOffsetResult.LogOffset(logEndOffset)
+ Option(offsets.get(topicPartition)) match {
+ case Some(logEndOffset) => topicPartition ->
LogOffsetResult.LogOffset(logEndOffset)
+ case _ => topicPartition -> LogOffsetResult.Unknown
+ }
}.toMap
}
private def getLogStartOffsets(topicPartitions: Seq[TopicPartition]):
Map[TopicPartition, LogOffsetResult] = {
val offsets = getConsumer.beginningOffsets(topicPartitions.asJava)
topicPartitions.map { topicPartition =>
- val logStartOffset = offsets.get(topicPartition)
- topicPartition -> LogOffsetResult.LogOffset(logStartOffset)
+ Option(offsets.get(topicPartition)) match {
+ case Some(logStartOffset) => topicPartition ->
LogOffsetResult.LogOffset(logStartOffset)
+ case _ => topicPartition -> LogOffsetResult.Unknown
+ }
}.toMap
}