This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7804ea1 KAFKA-6489; Fetcher.retrieveOffsetsByTimes() should batch the
metadata fetch.
7804ea1 is described below
commit 7804ea173bdbb0f401ad0135442c563fb52f895c
Author: Jiangjie Qin <[email protected]>
AuthorDate: Thu Feb 1 08:58:29 2018 -0800
KAFKA-6489; Fetcher.retrieveOffsetsByTimes() should batch the metadata
fetch.
Currently if users call KafkaConsumer.offsetsForTimes() with a large set of
partitions. The consumer will add one topic at a time for the metadata refresh.
We should add all the topics to the metadata topics and just do one metadata
refresh instead.
Author: Jiangjie Qin <[email protected]>
Reviewers: Jason Gustafson <[email protected]>
Closes #4478 from becketqin/KAFKA-6849
---
.../kafka/clients/consumer/internals/Fetcher.java | 5 +-
.../java/org/apache/kafka/clients/MockClient.java | 18 +++++++-
.../clients/consumer/internals/FetcherTest.java | 53 +++++++++++++---------
3 files changed, 52 insertions(+), 24 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 5dc0b26..6d56139 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -604,11 +604,14 @@ public class Fetcher<K, V> implements
SubscriptionState.Listener, Closeable {
final Map<TopicPartition, Long> timestampsToSearch) {
// Group the partitions by node.
final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode =
new HashMap<>();
+ // Add the topics to the metadata to do a single metadata fetch.
+ for (TopicPartition tp : timestampsToSearch.keySet())
+ metadata.add(tp.topic());
+
for (Map.Entry<TopicPartition, Long> entry:
timestampsToSearch.entrySet()) {
TopicPartition tp = entry.getKey();
PartitionInfo info = metadata.fetch().partition(tp);
if (info == null) {
- metadata.add(tp.topic());
log.debug("Partition {} is unknown for fetching offset, wait
for metadata refresh", tp);
return RequestFuture.staleMetadata();
} else if (info.leader() == null) {
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8b33472..d843414 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -198,6 +198,12 @@ public class MockClient implements KafkaClient {
if (metadataUpdate == null)
metadata.update(metadata.fetch(), this.unavailableTopics,
time.milliseconds());
else {
+ if (metadataUpdate.expectMatchRefreshTopics
+ &&
!metadata.topics().equals(metadataUpdate.cluster.topics())) {
+ throw new IllegalStateException("The metadata topics does
not match expectation. "
+ + "Expected topics: "
+ metadataUpdate.cluster.topics()
+ + ", asked topics: " +
metadata.topics());
+ }
this.unavailableTopics = metadataUpdate.unavailableTopics;
metadata.update(metadataUpdate.cluster,
metadataUpdate.unavailableTopics, time.milliseconds());
}
@@ -344,7 +350,13 @@ public class MockClient implements KafkaClient {
}
public void prepareMetadataUpdate(Cluster cluster, Set<String>
unavailableTopics) {
- metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics));
+ metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics,
false));
+ }
+
+ public void prepareMetadataUpdate(Cluster cluster,
+ Set<String> unavailableTopics,
+ boolean expectMatchMetadataTopics) {
+ metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics,
expectMatchMetadataTopics));
}
public void setNode(Node node) {
@@ -433,9 +445,11 @@ public class MockClient implements KafkaClient {
private static class MetadataUpdate {
final Cluster cluster;
final Set<String> unavailableTopics;
- MetadataUpdate(Cluster cluster, Set<String> unavailableTopics) {
+ final boolean expectMatchRefreshTopics;
+ MetadataUpdate(Cluster cluster, Set<String> unavailableTopics, boolean
expectMatchRefreshTopics) {
this.cluster = cluster;
this.unavailableTopics = unavailableTopics;
+ this.expectMatchRefreshTopics = expectMatchRefreshTopics;
}
}
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 26d7a50..a3ea793 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
@@ -1942,41 +1943,51 @@ public class FetcherTest {
return 1;
}
- private void testGetOffsetsForTimesWithError(Errors errorForTp0,
- Errors errorForTp1,
- long offsetForTp0,
- long offsetForTp1,
- Long expectedOffsetForTp0,
- Long expectedOffsetForTp1) {
+ private void testGetOffsetsForTimesWithError(Errors errorForP0,
+ Errors errorForP1,
+ long offsetForP0,
+ long offsetForP1,
+ Long expectedOffsetForP0,
+ Long expectedOffsetForP1) {
client.reset();
- // Ensure metadata has both partition.
- Cluster cluster = TestUtils.clusterWith(2, topicName, 2);
- metadata.update(cluster, Collections.<String>emptySet(),
time.milliseconds());
+ String topicName2 = "topic2";
+ TopicPartition t2p0 = new TopicPartition(topicName2, 0);
+ // Expect a metadata refresh.
+
metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))),
+ Collections.<String>emptySet(),
+ time.milliseconds());
+
+ Map<String, Integer> partitionNumByTopic = new HashMap<>();
+ partitionNumByTopic.put(topicName, 2);
+ partitionNumByTopic.put(topicName2, 1);
+ cluster = TestUtils.clusterWith(2, partitionNumByTopic);
+ // The metadata refresh should contain all the topics.
+ client.prepareMetadataUpdate(cluster, Collections.<String>emptySet(),
true);
// First try should fail due to metadata error.
- client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0,
offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
- client.prepareResponseFrom(listOffsetResponse(tp1, errorForTp1,
offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
+ client.prepareResponseFrom(listOffsetResponse(t2p0, errorForP0,
offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
+ client.prepareResponseFrom(listOffsetResponse(tp1, errorForP1,
offsetForP1, offsetForP1), cluster.leaderFor(tp1));
// Second try should succeed.
- client.prepareResponseFrom(listOffsetResponse(tp0, Errors.NONE,
offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
- client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE,
offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
+ client.prepareResponseFrom(listOffsetResponse(t2p0, Errors.NONE,
offsetForP0, offsetForP0), cluster.leaderFor(t2p0));
+ client.prepareResponseFrom(listOffsetResponse(tp1, Errors.NONE,
offsetForP1, offsetForP1), cluster.leaderFor(tp1));
Map<TopicPartition, Long> timestampToSearch = new HashMap<>();
- timestampToSearch.put(tp0, 0L);
+ timestampToSearch.put(t2p0, 0L);
timestampToSearch.put(tp1, 0L);
Map<TopicPartition, OffsetAndTimestamp> offsetAndTimestampMap =
fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
- if (expectedOffsetForTp0 == null)
- assertNull(offsetAndTimestampMap.get(tp0));
+ if (expectedOffsetForP0 == null)
+ assertNull(offsetAndTimestampMap.get(t2p0));
else {
- assertEquals(expectedOffsetForTp0.longValue(),
offsetAndTimestampMap.get(tp0).timestamp());
- assertEquals(expectedOffsetForTp0.longValue(),
offsetAndTimestampMap.get(tp0).offset());
+ assertEquals(expectedOffsetForP0.longValue(),
offsetAndTimestampMap.get(t2p0).timestamp());
+ assertEquals(expectedOffsetForP0.longValue(),
offsetAndTimestampMap.get(t2p0).offset());
}
- if (expectedOffsetForTp1 == null)
+ if (expectedOffsetForP1 == null)
assertNull(offsetAndTimestampMap.get(tp1));
else {
- assertEquals(expectedOffsetForTp1.longValue(),
offsetAndTimestampMap.get(tp1).timestamp());
- assertEquals(expectedOffsetForTp1.longValue(),
offsetAndTimestampMap.get(tp1).offset());
+ assertEquals(expectedOffsetForP1.longValue(),
offsetAndTimestampMap.get(tp1).timestamp());
+ assertEquals(expectedOffsetForP1.longValue(),
offsetAndTimestampMap.get(tp1).offset());
}
}
--
To stop receiving notification emails like this one, please contact
[email protected].