Repository: kafka Updated Branches: refs/heads/0.10.2 928789bf7 -> 94938f373
KAFKA-4631; Request metadata in consumer if topic/partitions unavailable If leader node of one more more partitions in a consumer subscription are temporarily unavailable, request metadata refresh so that partitions skipped for assignment dont have to wait for metadata expiry before reassignment. Metadata refresh is also requested if a subscribe topic or assigned partition doesn't exist. Author: Rajini Sivaram <[email protected]> Reviewers: Vahid Hashemian <[email protected]>, Ismael Juma <[email protected]>, Jason Gustafson <[email protected]> Closes #2622 from rajinisivaram/KAFKA-4631 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/94938f37 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/94938f37 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/94938f37 Branch: refs/heads/0.10.2 Commit: 94938f3734c71c69fe14612052ca16ecc0205d4d Parents: 928789b Author: Rajini Sivaram <[email protected]> Authored: Thu Mar 2 17:49:01 2017 -0800 Committer: Jason Gustafson <[email protected]> Committed: Thu Mar 16 14:45:24 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/clients/Metadata.java | 18 +++- .../org/apache/kafka/clients/NetworkClient.java | 2 +- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../consumer/internals/ConsumerCoordinator.java | 6 +- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../kafka/common/requests/MetadataResponse.java | 24 +++++ .../org/apache/kafka/clients/MetadataTest.java | 66 +++++++------- .../org/apache/kafka/clients/MockClient.java | 33 ++++--- .../apache/kafka/clients/NetworkClientTest.java | 2 +- .../clients/consumer/KafkaConsumerTest.java | 38 ++++---- .../internals/AbstractCoordinatorTest.java | 2 +- .../internals/ConsumerCoordinatorTest.java | 94 ++++++++++++++++---- .../clients/consumer/internals/FetcherTest.java | 4 +- .../clients/producer/internals/SenderTest.java | 14 +-- .../runtime/distributed/WorkerGroupMember.java | 3 +- .../distributed/WorkerCoordinatorTest.java | 2 +- .../main/scala/kafka/admin/AdminClient.scala | 2 +- .../kafka/api/ConsumerBounceTest.scala | 59 ++++++++++-- .../processor/internals/StreamsKafkaClient.java | 4 +- 19 files changed, 270 insertions(+), 107 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/clients/Metadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 75d48ab..428684d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -195,8 +195,13 @@ public final class Metadata { /** * Updates the cluster metadata. If topic expiry is enabled, expiry time * is set for topics if required and expired topics are removed from the metadata. + * + * @param cluster the cluster containing metadata for topics with valid metadata + * @param unavailableTopics topics which are non-existent or have one or more partitions whose + * leader is not known + * @param now current time in milliseconds */ - public synchronized void update(Cluster cluster, long now) { + public synchronized void update(Cluster cluster, Set<String> unavailableTopics, long now) { Objects.requireNonNull(cluster, "cluster should not be null"); this.needUpdate = false; @@ -219,7 +224,7 @@ public final class Metadata { } for (Listener listener: listeners) - listener.onMetadataUpdate(cluster); + listener.onMetadataUpdate(cluster, unavailableTopics); String previousClusterId = cluster.clusterResource().clusterId(); @@ -302,7 +307,14 @@ public final class Metadata { * MetadataUpdate Listener */ public interface Listener { - void onMetadataUpdate(Cluster cluster); + /** + * Callback invoked on metadata update. + * + * @param cluster the cluster containing metadata for topics with valid metadata + * @param unavailableTopics topics which are non-existent or have one or more partitions whose + * leader is not known + */ + void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics); } private synchronized void requestUpdateForNewTopics() { http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 3a75288..1e8dbd3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -709,7 +709,7 @@ public class NetworkClient implements KafkaClient { // don't update the cluster if there are no valid nodes...the topic we want may still be in the process of being // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) { - this.metadata.update(cluster, now); + this.metadata.update(cluster, response.unavailableTopics(), now); } else { log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId()); this.metadata.failedUpdate(now); http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index eac9579..64d64ba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -652,7 +652,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList); this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), false, clusterResourceListeners); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), 0); + this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0); String metricGrpPrefix = "consumer"; ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); NetworkClient netClient = new NetworkClient( http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 8669527..2e37636 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -168,7 +169,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator { private void addMetadataListener() { this.metadata.addListener(new Metadata.Listener() { @Override - public void onMetadataUpdate(Cluster cluster) { + public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) { // if we encounter any unauthorized topics, raise an exception to the user if (!cluster.unauthorizedTopics().isEmpty()) throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics())); @@ -182,6 +183,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator { if (!snapshot.equals(metadataSnapshot)) metadataSnapshot = snapshot; } + + if (!Collections.disjoint(metadata.topics(), unavailableTopics)) + metadata.requestUpdate(); } }); } http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 29defbb..6f1a3b6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -301,7 +301,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { time); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); + this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds()); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); NetworkClient client = new NetworkClient( new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), this.metrics, time, "producer", channelBuilder), http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index a8baee5..7209db5 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -15,6 +15,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.errors.InvalidMetadataException; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.ProtoUtils; @@ -262,6 +263,29 @@ public class MetadataResponse extends AbstractResponse { } /** + * Returns the set of topics with an error indicating invalid metadata + * and topics with any partition whose error indicates invalid metadata. + * This includes all non-existent topics specified in the metadata request + * and any topic returned with one or more partitions whose leader is not known. + */ + public Set<String> unavailableTopics() { + Set<String> invalidMetadataTopics = new HashSet<>(); + for (TopicMetadata topicMetadata : this.topicMetadata) { + if (topicMetadata.error.exception() instanceof InvalidMetadataException) + invalidMetadataTopics.add(topicMetadata.topic); + else { + for (PartitionMetadata partitionMetadata : topicMetadata.partitionMetadata) { + if (partitionMetadata.error.exception() instanceof InvalidMetadataException) { + invalidMetadataTopics.add(topicMetadata.topic); + break; + } + } + } + } + return invalidMetadataTopics; + } + + /** * Get a snapshot of the cluster metadata from this response * @return the cluster snapshot */ http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index cfd2a94..db88b77 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -52,7 +52,7 @@ public class MetadataTest { @Test public void testMetadata() throws Exception { long time = 0; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); metadata.requestUpdate(); assertFalse("Still no updated needed due to backoff", metadata.timeToNextUpdate(time) == 0); @@ -67,7 +67,7 @@ public class MetadataTest { // This simulates the metadata update sequence in KafkaProducer while (t1.isAlive() || t2.isAlive()) { if (metadata.timeToNextUpdate(time) == 0) { - metadata.update(TestUtils.singletonCluster(topic, 1), time); + metadata.update(TestUtils.singletonCluster(topic, 1), Collections.<String>emptySet(), time); time += refreshBackoffMs; } Thread.sleep(1); @@ -97,7 +97,7 @@ public class MetadataTest { assertEquals(0, metadata.timeToNextUpdate(now)); // lastSuccessfulRefreshMs updated to now. - metadata.update(Cluster.empty(), now); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), now); // The last update was successful so the remaining time to expire the current metadata should be returned. assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now)); @@ -108,7 +108,7 @@ public class MetadataTest { assertEquals(refreshBackoffMs, metadata.timeToNextUpdate(now)); // Reset needUpdate to false. - metadata.update(Cluster.empty(), now); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), now); assertEquals(largerOfBackoffAndExpire, metadata.timeToNextUpdate(now)); // Both metadataExpireMs and refreshBackoffMs elapsed. @@ -152,13 +152,13 @@ public class MetadataTest { long now = 10000; // New topic added to fetch set and update requested. It should allow immediate update. - metadata.update(Cluster.empty(), now); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), now); metadata.add("new-topic"); assertEquals(0, metadata.timeToNextUpdate(now)); // Even though setTopics called, immediate update isn't necessary if the new topic set isn't // containing a new topic, - metadata.update(Cluster.empty(), now); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), now); metadata.setTopics(metadata.topics()); assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now)); @@ -167,12 +167,12 @@ public class MetadataTest { assertEquals(0, metadata.timeToNextUpdate(now)); // If metadata requested for all topics it should allow immediate update. - metadata.update(Cluster.empty(), now); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), now); metadata.needMetadataForAllTopics(true); assertEquals(0, metadata.timeToNextUpdate(now)); // However if metadata is already capable to serve all topics it shouldn't override backoff. - metadata.update(Cluster.empty(), now); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), now); metadata.needMetadataForAllTopics(true); assertEquals(metadataExpireMs, metadata.timeToNextUpdate(now)); } @@ -187,7 +187,7 @@ public class MetadataTest { @Test public void testMetadataUpdateWaitTime() throws Exception { long time = 0; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0); // first try with a max wait time of 0 and ensure that this returns back without waiting forever try { @@ -209,7 +209,7 @@ public class MetadataTest { @Test public void testFailedUpdate() { long time = 100; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertEquals(100, metadata.timeToNextUpdate(1000)); metadata.failedUpdate(1100); @@ -218,14 +218,14 @@ public class MetadataTest { assertEquals(100, metadata.lastSuccessfulUpdate()); metadata.needMetadataForAllTopics(true); - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertEquals(100, metadata.timeToNextUpdate(1000)); } @Test public void testUpdateWithNeedMetadataForAllTopics() { long time = 0; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); metadata.needMetadataForAllTopics(true); final List<String> expectedTopics = Collections.singletonList("topic"); @@ -237,7 +237,7 @@ public class MetadataTest { new PartitionInfo("topic1", 0, null, null, null)), Collections.<String>emptySet(), Collections.<String>emptySet()), - 100); + Collections.<String>emptySet(), 100); assertArrayEquals("Metadata got updated with wrong set of topics.", expectedTopics.toArray(), metadata.topics().toArray()); @@ -255,7 +255,7 @@ public class MetadataTest { String hostName = "www.example.com"; Cluster cluster = Cluster.bootstrap(Arrays.asList(new InetSocketAddress(hostName, 9002))); - metadata.update(cluster, time); + metadata.update(cluster, Collections.<String>emptySet(), time); assertFalse("ClusterResourceListener should not called when metadata is updated with bootstrap Cluster", MockClusterResourceListener.IS_ON_UPDATE_CALLED.get()); @@ -267,7 +267,7 @@ public class MetadataTest { new PartitionInfo("topic1", 0, null, null, null)), Collections.<String>emptySet(), Collections.<String>emptySet()), - 100); + Collections.<String>emptySet(), 100); assertEquals("MockClusterResourceListener did not get cluster metadata correctly", "dummy", mockClusterListener.clusterResource().clusterId()); @@ -279,10 +279,10 @@ public class MetadataTest { public void testListenerGetsNotifiedOfUpdate() { long time = 0; final Set<String> topics = new HashSet<>(); - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); metadata.addListener(new Metadata.Listener() { @Override - public void onMetadataUpdate(Cluster cluster) { + public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) { topics.clear(); topics.addAll(cluster.topics()); } @@ -296,7 +296,7 @@ public class MetadataTest { new PartitionInfo("topic1", 0, null, null, null)), Collections.<String>emptySet(), Collections.<String>emptySet()), - 100); + Collections.<String>emptySet(), 100); assertEquals("Listener did not update topics list correctly", new HashSet<>(Arrays.asList("topic", "topic1")), topics); @@ -306,10 +306,10 @@ public class MetadataTest { public void testListenerCanUnregister() { long time = 0; final Set<String> topics = new HashSet<>(); - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); final Metadata.Listener listener = new Metadata.Listener() { @Override - public void onMetadataUpdate(Cluster cluster) { + public void onMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) { topics.clear(); topics.addAll(cluster.topics()); } @@ -324,7 +324,7 @@ public class MetadataTest { new PartitionInfo("topic1", 0, null, null, null)), Collections.<String>emptySet(), Collections.<String>emptySet()), - 100); + Collections.<String>emptySet(), 100); metadata.removeListener(listener); @@ -336,7 +336,7 @@ public class MetadataTest { new PartitionInfo("topic3", 0, null, null, null)), Collections.<String>emptySet(), Collections.<String>emptySet()), - 100); + Collections.<String>emptySet(), 100); assertEquals("Listener did not update topics list correctly", new HashSet<>(Arrays.asList("topic", "topic1")), topics); @@ -349,17 +349,17 @@ public class MetadataTest { // Test that topic is expired if not used within the expiry interval long time = 0; metadata.add("topic1"); - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); time += Metadata.TOPIC_EXPIRY_MS; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertFalse("Unused topic not expired", metadata.containsTopic("topic1")); // Test that topic is not expired if used within the expiry interval metadata.add("topic2"); - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); for (int i = 0; i < 3; i++) { time += Metadata.TOPIC_EXPIRY_MS / 2; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertTrue("Topic expired even though in use", metadata.containsTopic("topic2")); metadata.add("topic2"); } @@ -368,9 +368,9 @@ public class MetadataTest { HashSet<String> topics = new HashSet<>(); topics.add("topic4"); metadata.setTopics(topics); - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); time += Metadata.TOPIC_EXPIRY_MS; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertFalse("Unused topic not expired", metadata.containsTopic("topic4")); } @@ -381,17 +381,17 @@ public class MetadataTest { // Test that topic is not expired if not used within the expiry interval long time = 0; metadata.add("topic1"); - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); time += Metadata.TOPIC_EXPIRY_MS; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic1")); // Test that topic is not expired if used within the expiry interval metadata.add("topic2"); - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); for (int i = 0; i < 3; i++) { time += Metadata.TOPIC_EXPIRY_MS / 2; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertTrue("Topic expired even though in use", metadata.containsTopic("topic2")); metadata.add("topic2"); } @@ -401,7 +401,7 @@ public class MetadataTest { topics.add("topic4"); metadata.setTopics(topics); time += metadataExpireMs * 2; - metadata.update(Cluster.empty(), time); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time); assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4")); } http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 50ed131..df29f31 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -26,6 +26,7 @@ import org.apache.kafka.test.TestUtils; import java.util.ArrayDeque; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -63,6 +64,7 @@ public class MockClient implements KafkaClient { private final Time time; private final Metadata metadata; + private Set<String> unavailableTopics; private int correlation = 0; private Node node = null; private final Set<String> ready = new HashSet<>(); @@ -72,17 +74,17 @@ public class MockClient implements KafkaClient { // Use concurrent queue for responses so that responses may be updated during poll() from a different thread. private final Queue<ClientResponse> responses = new ConcurrentLinkedDeque<>(); private final Queue<FutureResponse> futureResponses = new ArrayDeque<>(); - private final Queue<Cluster> metadataUpdates = new ArrayDeque<>(); + private final Queue<MetadataUpdate> metadataUpdates = new ArrayDeque<>(); private volatile NodeApiVersions nodeApiVersions = NodeApiVersions.create(); public MockClient(Time time) { - this.time = time; - this.metadata = null; + this(time, null); } public MockClient(Time time, Metadata metadata) { this.time = time; this.metadata = metadata; + this.unavailableTopics = Collections.emptySet(); } @Override @@ -167,11 +169,13 @@ public class MockClient implements KafkaClient { List<ClientResponse> copy = new ArrayList<>(this.responses); if (metadata != null && metadata.updateRequested()) { - Cluster cluster = metadataUpdates.poll(); - if (cluster == null) - metadata.update(metadata.fetch(), time.milliseconds()); - else - metadata.update(cluster, time.milliseconds()); + MetadataUpdate metadataUpdate = metadataUpdates.poll(); + if (metadataUpdate == null) + metadata.update(metadata.fetch(), this.unavailableTopics, time.milliseconds()); + else { + this.unavailableTopics = metadataUpdate.unavailableTopics; + metadata.update(metadataUpdate.cluster, metadataUpdate.unavailableTopics, time.milliseconds()); + } } while (!this.responses.isEmpty()) { @@ -277,8 +281,8 @@ public class MockClient implements KafkaClient { metadataUpdates.clear(); } - public void prepareMetadataUpdate(Cluster cluster) { - metadataUpdates.add(cluster); + public void prepareMetadataUpdate(Cluster cluster, Set<String> unavailableTopics) { + metadataUpdates.add(new MetadataUpdate(cluster, unavailableTopics)); } public void setNode(Node node) { @@ -339,4 +343,13 @@ public class MockClient implements KafkaClient { public void setNodeApiVersions(NodeApiVersions nodeApiVersions) { this.nodeApiVersions = nodeApiVersions; } + + private static class MetadataUpdate { + final Cluster cluster; + final Set<String> unavailableTopics; + MetadataUpdate(Cluster cluster, Set<String> unavailableTopics) { + this.cluster = cluster; + this.unavailableTopics = unavailableTopics; + } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index deaf2cc..a95af83 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -78,7 +78,7 @@ public class NetworkClientTest { @Before public void setup() { - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); } @Test(expected = IllegalStateException.class) http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 4aaa172..f2905a9 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -364,7 +364,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -405,7 +405,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -446,7 +446,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -482,7 +482,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -530,7 +530,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -591,7 +591,7 @@ public class KafkaConsumerTest { consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer)); - client.prepareMetadataUpdate(cluster); + client.prepareMetadataUpdate(cluster, Collections.<String>emptySet()); consumer.poll(0); assertEquals(singleton(topic), consumer.subscription()); @@ -622,7 +622,7 @@ public class KafkaConsumerTest { MockClient client = new MockClient(time, metadata); client.setNode(node); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor, rebalanceTimeoutMs, sessionTimeoutMs, heartbeatIntervalMs, false, autoCommitIntervalMs); @@ -630,14 +630,14 @@ public class KafkaConsumerTest { Node coordinator = prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); consumer.subscribe(Pattern.compile(topic), getConsumerRebalanceListener(consumer)); - client.prepareMetadataUpdate(cluster); + client.prepareMetadataUpdate(cluster, Collections.<String>emptySet()); consumer.poll(0); assertEquals(singleton(topic), consumer.subscription()); consumer.subscribe(Pattern.compile(otherTopic), getConsumerRebalanceListener(consumer)); - client.prepareMetadataUpdate(cluster); + client.prepareMetadataUpdate(cluster, Collections.<String>emptySet()); prepareRebalance(client, node, singleton(otherTopic), assignor, singletonList(otherTopicPartition), coordinator); consumer.poll(0); @@ -660,7 +660,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -705,7 +705,7 @@ public class KafkaConsumerTest { final Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); final MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -745,7 +745,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -793,7 +793,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -913,7 +913,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -981,7 +981,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -1046,7 +1046,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -1107,7 +1107,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -1226,7 +1226,7 @@ public class KafkaConsumerTest { Node node = cluster.nodes().get(0); Metadata metadata = new Metadata(0, Long.MAX_VALUE); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); MockClient client = new MockClient(time, metadata); client.setNode(node); @@ -1238,7 +1238,7 @@ public class KafkaConsumerTest { consumer.subscribe(Arrays.asList(topic), getConsumerRebalanceListener(consumer)); Node coordinator = prepareRebalance(client, node, assignor, Arrays.asList(tp0), null); - client.prepareMetadataUpdate(cluster); + client.prepareMetadataUpdate(cluster, Collections.<String>emptySet()); // Poll with responses client.prepareResponseFrom(fetchResponse(tp0, 0, 1), node); http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index bb617ae..8846b5e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -79,7 +79,7 @@ public class AbstractCoordinatorTest { Metrics metrics = new Metrics(); Cluster cluster = TestUtils.singletonCluster("topic", 1); - metadata.update(cluster, mockTime.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), mockTime.milliseconds()); this.node = cluster.nodes().get(0); mockClient.setNode(node); http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index e11bf30..66fe76d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -120,7 +120,7 @@ public class ConsumerCoordinatorTest { this.time = new MockTime(); this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST); this.metadata = new Metadata(0, Long.MAX_VALUE); - this.metadata.update(cluster, time.milliseconds()); + this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); this.client = new MockClient(time, metadata); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); @@ -292,7 +292,7 @@ public class ConsumerCoordinatorTest { // ensure metadata is up-to-date for leader metadata.setTopics(singletonList(topic1)); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -310,7 +310,7 @@ public class ConsumerCoordinatorTest { // ensure metadata is up-to-date for leader metadata.setTopics(singletonList(topic1)); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -349,7 +349,7 @@ public class ConsumerCoordinatorTest { // partially update the metadata with one topic first, // let the leader to refresh metadata during assignment metadata.setTopics(singletonList(topic1)); - metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds()); + metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -369,7 +369,7 @@ public class ConsumerCoordinatorTest { } }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE.code())); // expect client to force updating the metadata, if yes gives it both topics - client.prepareMetadataUpdate(cluster); + client.prepareMetadataUpdate(cluster, Collections.<String>emptySet()); coordinator.poll(time.milliseconds()); @@ -389,7 +389,7 @@ public class ConsumerCoordinatorTest { subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener); metadata.needMetadataForAllTopics(true); - metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds()); + metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds()); assertEquals(singleton(topic1), subscriptions.subscription()); @@ -410,7 +410,7 @@ public class ConsumerCoordinatorTest { final Map<String, Integer> updatedPartitions = new HashMap<>(); for (String topic : updatedSubscription) updatedPartitions.put(topic, 1); - metadata.update(TestUtils.clusterWith(1, updatedPartitions), time.milliseconds()); + metadata.update(TestUtils.clusterWith(1, updatedPartitions), Collections.<String>emptySet(), time.milliseconds()); return true; } }, syncGroupResponse(singletonList(t1p), Errors.NONE.code())); @@ -453,7 +453,7 @@ public class ConsumerCoordinatorTest { // ensure metadata is up-to-date for leader metadata.setTopics(singletonList(topic1)); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -524,7 +524,7 @@ public class ConsumerCoordinatorTest { // partially update the metadata with one topic first, // let the leader to refresh metadata during assignment metadata.setTopics(singletonList(topic1)); - metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds()); + metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -541,7 +541,7 @@ public class ConsumerCoordinatorTest { } }, syncGroupResponse(Arrays.asList(t1p, t2p), Errors.NONE.code())); // expect client to force updating the metadata, if yes gives it both topics - client.prepareMetadataUpdate(cluster); + client.prepareMetadataUpdate(cluster, Collections.<String>emptySet()); coordinator.joinGroupIfNeeded(); @@ -712,7 +712,7 @@ public class ConsumerCoordinatorTest { // ensure metadata is up-to-date for leader metadata.setTopics(singletonList(topic1)); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); subscriptions.subscribe(singleton(topic1), rebalanceListener); @@ -731,7 +731,7 @@ public class ConsumerCoordinatorTest { assertFalse(coordinator.needRejoin()); // a new partition is added to the topic - metadata.update(TestUtils.singletonCluster(topic1, 2), time.milliseconds()); + metadata.update(TestUtils.singletonCluster(topic1, 2), Collections.<String>emptySet(), time.milliseconds()); // we should detect the change and ask for reassignment assertTrue(coordinator.needRejoin()); @@ -751,7 +751,7 @@ public class ConsumerCoordinatorTest { metadata.setTopics(topics); // we only have metadata for one topic initially - metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds()); + metadata.update(TestUtils.singletonCluster(topic1, 1), Collections.<String>emptySet(), time.milliseconds()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -772,7 +772,7 @@ public class ConsumerCoordinatorTest { Map<String, Integer> topicPartitionCounts = new HashMap<>(); topicPartitionCounts.put(topic1, 1); topicPartitionCounts.put(topic2, 1); - metadata.update(TestUtils.singletonCluster(topicPartitionCounts), time.milliseconds()); + metadata.update(TestUtils.singletonCluster(topicPartitionCounts), Collections.<String>emptySet(), time.milliseconds()); return true; } return false; @@ -789,12 +789,72 @@ public class ConsumerCoordinatorTest { assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions()); } + @Test + public void testRebalanceAfterTopicUnavailableWithSubscribe() { + unavailableTopicTest(false, false, Collections.<String>emptySet()); + } + + @Test + public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() { + unavailableTopicTest(true, false, Collections.<String>emptySet()); + } + + @Test + public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSSubscribe() { + unavailableTopicTest(true, false, Collections.<String>singleton("notmatching")); + } + + @Test + public void testAssignWithTopicUnavailable() { + unavailableTopicTest(true, false, Collections.<String>emptySet()); + } + + private void unavailableTopicTest(boolean patternSubscribe, boolean assign, Set<String> unavailableTopicsInLastMetadata) { + final String consumerId = "consumer"; + + metadata.setTopics(singletonList(topic1)); + client.prepareMetadataUpdate(Cluster.empty(), Collections.singleton("test1")); + + if (assign) + subscriptions.assignFromUser(singleton(t1p)); + else if (patternSubscribe) + subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener); + else + subscriptions.subscribe(singleton(topic1), rebalanceListener); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorReady(); + + Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1)); + partitionAssignor.prepare(Collections.<String, List<TopicPartition>>emptyMap()); + + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Collections.<TopicPartition>emptyList(), Errors.NONE.code())); + coordinator.poll(time.milliseconds()); + if (!assign) { + assertFalse(coordinator.needRejoin()); + assertEquals(Collections.<TopicPartition>emptySet(), rebalanceListener.assigned); + } + assertTrue("Metadata refresh not requested for unavailable partitions", metadata.updateRequested()); + + client.prepareMetadataUpdate(cluster, unavailableTopicsInLastMetadata); + client.poll(0, time.milliseconds()); + client.prepareResponse(joinGroupLeaderResponse(2, consumerId, memberSubscriptions, Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); + coordinator.poll(time.milliseconds()); + + assertFalse("Metadata refresh requested unnecessarily", metadata.updateRequested()); + if (!assign) { + assertFalse(coordinator.needRejoin()); + assertEquals(singleton(t1p), rebalanceListener.assigned); + } + } @Test public void testExcludeInternalTopicsConfigOption() { subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener); - metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds()); + metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds()); assertFalse(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME)); } @@ -804,7 +864,7 @@ public class ConsumerCoordinatorTest { coordinator = buildCoordinator(new Metrics(), assignors, false, false); subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener); - metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds()); + metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.<String>emptySet(), time.milliseconds()); assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME)); } @@ -1030,7 +1090,7 @@ public class ConsumerCoordinatorTest { client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE.code())); - client.prepareMetadataUpdate(cluster); + client.prepareMetadataUpdate(cluster, Collections.<String>emptySet()); coordinator.joinGroupIfNeeded(); http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index ecaade2..4d388e6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -108,7 +108,7 @@ public class FetcherTest { @Before public void setup() throws Exception { - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); client.setNode(node); MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), CompressionType.NONE, TimestampType.CREATE_TIME, 1L); @@ -837,7 +837,7 @@ public class FetcherTest { TopicPartition tp1 = new TopicPartition(topicName, 1); // Ensure metadata has both partition. Cluster cluster = TestUtils.clusterWith(2, topicName, 2); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); // First try should fail due to metadata error. client.prepareResponseFrom(listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0)); http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 7f5fe15..a3dae66 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -84,7 +84,7 @@ public class SenderTest { time, REQUEST_TIMEOUT); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); } @After @@ -197,7 +197,7 @@ public class SenderTest { // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1 Cluster cluster1 = TestUtils.clusterWith(2, "test", 2); - metadata.update(cluster1, time.milliseconds()); + metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds()); // Send the first message. TopicPartition tp2 = new TopicPartition("test", 1); @@ -216,7 +216,7 @@ public class SenderTest { // Update metadata before sender receives response from broker 0. Now partition 2 moves to broker 0 Cluster cluster2 = TestUtils.singletonCluster("test", 2); - metadata.update(cluster2, time.milliseconds()); + metadata.update(cluster2, Collections.<String>emptySet(), time.milliseconds()); // Sender should not send the second message to node 0. sender.run(time.milliseconds()); assertEquals(1, client.inFlightRequestCount()); @@ -232,12 +232,12 @@ public class SenderTest { @Test public void testMetadataTopicExpiry() throws Exception { long offset = 0; - metadata.update(Cluster.empty(), time.milliseconds()); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds()); Future<RecordMetadata> future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic())); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); sender.run(time.milliseconds()); // send produce request client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0)); sender.run(time.milliseconds()); @@ -247,12 +247,12 @@ public class SenderTest { assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp.topic())); time.sleep(Metadata.TOPIC_EXPIRY_MS); - metadata.update(Cluster.empty(), time.milliseconds()); + metadata.update(Cluster.empty(), Collections.<String>emptySet(), time.milliseconds()); assertFalse("Unused topic has not been expired", metadata.containsTopic(tp.topic())); future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; sender.run(time.milliseconds()); assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic())); - metadata.update(cluster, time.milliseconds()); + metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); sender.run(time.milliseconds()); // send produce request client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0)); sender.run(time.milliseconds()); http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index ac13472..f6ff665 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -38,6 +38,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.InetSocketAddress; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -88,7 +89,7 @@ public class WorkerGroupMember { this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG)); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), 0); + this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0); String metricGrpPrefix = "connect"; ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config.values()); NetworkClient netClient = new NetworkClient( http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 92393a1..282e175 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -91,7 +91,7 @@ public class WorkerCoordinatorTest { this.time = new MockTime(); this.client = new MockClient(time); this.metadata = new Metadata(0, Long.MAX_VALUE); - this.metadata.update(cluster, time.milliseconds()); + this.metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds()); this.consumerClient = new ConsumerNetworkClient(client, metadata, time, 100, 1000); this.metrics = new Metrics(time); this.rebalanceListener = new MockRebalanceListener(); http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/core/src/main/scala/kafka/admin/AdminClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 680c5e1..6ccfe94 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -231,7 +231,7 @@ object AdminClient { val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls) val bootstrapCluster = Cluster.bootstrap(brokerAddresses) - metadata.update(bootstrapCluster, 0) + metadata.update(bootstrapCluster, Collections.emptySet(), 0) val selector = new Selector( DefaultConnectionMaxIdleMs, http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala index e848e28..4ec77a1 100644 --- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala +++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala @@ -43,12 +43,13 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { // Time to process commit and leave group requests in tests when brokers are available val gracefulCloseTimeMs = 1000 - val executor = Executors.newFixedThreadPool(2) + val executor = Executors.newScheduledThreadPool(2) // configure the servers and clients this.serverConfig.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp, "3") // don't want to lose offset this.serverConfig.setProperty(KafkaConfig.OffsetsTopicPartitionsProp, "1") this.serverConfig.setProperty(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout + this.serverConfig.setProperty(KafkaConfig.AutoCreateTopicsEnableProp, "false") this.producerConfig.setProperty(ProducerConfig.ACKS_CONFIG, "all") this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-test") this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 4096.toString) @@ -161,6 +162,52 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { } @Test + def testSubscribeWhenTopicUnavailable() { + val numRecords = 1000 + val newtopic = "newtopic" + + val consumer = this.consumers.head + consumer.subscribe(Collections.singleton(newtopic)) + executor.schedule(new Runnable { + def run() = TestUtils.createTopic(zkUtils, newtopic, serverCount, serverCount, servers) + }, 2, TimeUnit.SECONDS) + consumer.poll(0) + + def sendRecords(numRecords: Int, topic: String = this.topic) { + var remainingRecords = numRecords + val endTimeMs = System.currentTimeMillis + 20000 + while (remainingRecords > 0 && System.currentTimeMillis < endTimeMs) { + val futures = (0 until remainingRecords).map { i => + this.producers.head.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) + } + futures.map { future => + try { + future.get + remainingRecords -= 1 + } catch { + case _: Exception => + } + } + } + assertEquals(0, remainingRecords) + } + + sendRecords(numRecords, newtopic) + receiveRecords(consumer, numRecords, newtopic, 10000) + + servers.foreach(server => killBroker(server.config.brokerId)) + Thread.sleep(500) + restartDeadBrokers() + + val future = executor.submit(new Runnable { + def run() = receiveRecords(consumer, numRecords, newtopic, 10000) + }) + sendRecords(numRecords, newtopic) + future.get + } + + + @Test def testClose() { val numRecords = 10 sendRecords(numRecords) @@ -311,10 +358,12 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { consumer } - private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) { - var received = 0 - while (received < numRecords) + private def receiveRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int, topic: String = this.topic, timeoutMs: Long = 60000) { + var received = 0L + val endTimeMs = System.currentTimeMillis + timeoutMs + while (received < numRecords && System.currentTimeMillis < endTimeMs) received += consumer.poll(1000).count() + assertEquals(numRecords, received) } private def submitCloseAndValidate(consumer: KafkaConsumer[Array[Byte], Array[Byte]], @@ -372,7 +421,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging { } } - private def sendRecords(numRecords: Int) { + private def sendRecords(numRecords: Int, topic: String = this.topic) { val futures = (0 until numRecords).map { i => this.producers.head.send(new ProducerRecord(topic, part, i.toString.getBytes, i.toString.getBytes)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/94938f37/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java index 94d8854..60dd416 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java @@ -95,7 +95,7 @@ public class StreamsKafkaClient { streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG) ); final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - metadata.update(Cluster.bootstrap(addresses), time.milliseconds()); + metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds()); final MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG)) .timeWindow(streamsConfig.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS) @@ -223,7 +223,7 @@ public class StreamsKafkaClient { streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG), streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG)); final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - metadata.update(Cluster.bootstrap(addresses), Time.SYSTEM.milliseconds()); + metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), Time.SYSTEM.milliseconds()); final List<Node> nodes = metadata.fetch().nodes(); return ensureOneNodeIsReady(nodes);
