Repository: kafka Updated Branches: refs/heads/trunk 79aaf19f2 -> 0cee0c321
KAFKA-2948; Remove unused topics from producer metadata set If no messages are sent to a topic during the last refresh interval or if UNKNOWN_TOPIC_OR_PARTITION error is received, remove the topic from the metadata list. Topics are added to the list on the next attempt to send a message to the topic. Author: Rajini Sivaram <[email protected]> Author: rsivaram <[email protected]> Reviewers: Jason Gustafson <[email protected]>, Ewen Cheslack-Postava <[email protected]>, Ismael Juma <[email protected]> Closes #645 from rajinisivaram/KAFKA-2948 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0cee0c32 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0cee0c32 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0cee0c32 Branch: refs/heads/trunk Commit: 0cee0c321897b4fca4409651fdf28188870cb2f0 Parents: 79aaf19 Author: Rajini Sivaram <[email protected]> Authored: Mon Jun 6 19:53:53 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Mon Jun 6 19:55:50 2016 +0100 ---------------------------------------------------------------------- .../java/org/apache/kafka/clients/Metadata.java | 64 ++++++++++++++++---- .../kafka/clients/producer/KafkaProducer.java | 8 +-- .../producer/internals/RecordAccumulator.java | 20 +++--- .../clients/producer/internals/Sender.java | 8 ++- .../kafka/common/requests/MetadataResponse.java | 17 ++++-- .../org/apache/kafka/clients/MetadataTest.java | 63 +++++++++++++++++++ .../internals/ConsumerCoordinatorTest.java | 31 ++++++++++ .../clients/producer/internals/SenderTest.java | 39 +++++++++++- 8 files changed, 216 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/Metadata.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 322ae0f..54b19a3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -15,9 +15,13 @@ package org.apache.kafka.clients; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.Set; + import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; @@ -32,11 +36,18 @@ import org.slf4j.LoggerFactory; * * Metadata is maintained for only a subset of topics, which can be added to over time. When we request metadata for a * topic we don't have any metadata for it will trigger a metadata update. + * <p> + * If topic expiry is enabled for the metadata, any topic that has not been used within the expiry interval + * is removed from the metadata refresh set after an update. Consumers disable topic expiry since they explicitly + * manage topics while producers rely on topic expiry to limit the refresh set. */ public final class Metadata { private static final Logger log = LoggerFactory.getLogger(Metadata.class); + public static final long TOPIC_EXPIRY_MS = 5 * 60 * 1000; + private static final long TOPIC_EXPIRY_NEEDS_UPDATE = -1L; + private final long refreshBackoffMs; private final long metadataExpireMs; private int version; @@ -44,9 +55,11 @@ public final class Metadata { private long lastSuccessfulRefreshMs; private Cluster cluster; private boolean needUpdate; - private final Set<String> topics; + /* Topics with expiry time */ + private final Map<String, Long> topics; private final List<Listener> listeners; private boolean needMetadataForAllTopics; + private final boolean topicExpiryEnabled; /** * Create a metadata instance with reasonable defaults @@ -55,21 +68,27 @@ public final class Metadata { this(100L, 60 * 60 * 1000L); } + public Metadata(long refreshBackoffMs, long metadataExpireMs) { + this(refreshBackoffMs, metadataExpireMs, false); + } + /** * Create a new Metadata instance * @param refreshBackoffMs The minimum amount of time that must expire between metadata refreshes to avoid busy * polling * @param metadataExpireMs The maximum amount of time that metadata can be retained without refresh + * @param topicExpiryEnabled If true, enable expiry of unused topics */ - public Metadata(long refreshBackoffMs, long metadataExpireMs) { + public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean topicExpiryEnabled) { this.refreshBackoffMs = refreshBackoffMs; this.metadataExpireMs = metadataExpireMs; + this.topicExpiryEnabled = topicExpiryEnabled; this.lastRefreshMs = 0L; this.lastSuccessfulRefreshMs = 0L; this.version = 0; this.cluster = Cluster.empty(); this.needUpdate = false; - this.topics = new HashSet<String>(); + this.topics = new HashMap<>(); this.listeners = new ArrayList<>(); this.needMetadataForAllTopics = false; } @@ -82,10 +101,11 @@ public final class Metadata { } /** - * Add the topic to maintain in the metadata + * Add the topic to maintain in the metadata. If topic expiry is enabled, expiry will + * time be reset on the next update. */ public synchronized void add(String topic) { - topics.add(topic); + topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE); } /** @@ -135,21 +155,24 @@ public final class Metadata { } /** - * Replace the current set of topics maintained to the one provided + * Replace the current set of topics maintained to the one provided. + * If topic expiry is enabled, expiry time of the topics will be + * reset on the next update. * @param topics */ public synchronized void setTopics(Collection<String> topics) { - if (!this.topics.containsAll(topics)) + if (!this.topics.keySet().containsAll(topics)) requestUpdate(); this.topics.clear(); - this.topics.addAll(topics); + for (String topic : topics) + this.topics.put(topic, TOPIC_EXPIRY_NEEDS_UPDATE); } /** * Get the list of topics we are currently maintaining metadata for */ public synchronized Set<String> topics() { - return new HashSet<String>(this.topics); + return new HashSet<>(this.topics.keySet()); } /** @@ -158,11 +181,12 @@ public final class Metadata { * @return true if the topic exists, false otherwise */ public synchronized boolean containsTopic(String topic) { - return this.topics.contains(topic); + return this.topics.containsKey(topic); } /** - * Update the cluster metadata + * Updates the cluster metadata. If topic expiry is enabled, expiry time + * is set for topics if required and expired topics are removed from the metadata. */ public synchronized void update(Cluster cluster, long now) { this.needUpdate = false; @@ -170,6 +194,20 @@ public final class Metadata { this.lastSuccessfulRefreshMs = now; this.version += 1; + if (topicExpiryEnabled) { + // Handle expiry of topics from the metadata refresh set. + for (Iterator<Map.Entry<String, Long>> it = topics.entrySet().iterator(); it.hasNext(); ) { + Map.Entry<String, Long> entry = it.next(); + long expireMs = entry.getValue(); + if (expireMs == TOPIC_EXPIRY_NEEDS_UPDATE) + entry.setValue(now + TOPIC_EXPIRY_MS); + else if (expireMs <= now) { + it.remove(); + log.debug("Removing unused topic {} from the metadata list, expiryMs {} now {}", entry.getKey(), expireMs, now); + } + } + } + for (Listener listener: listeners) listener.onMetadataUpdate(cluster); @@ -251,9 +289,9 @@ public final class Metadata { List<Node> nodes = Collections.emptyList(); if (cluster != null) { unauthorizedTopics.addAll(cluster.unauthorizedTopics()); - unauthorizedTopics.retainAll(this.topics); + unauthorizedTopics.retainAll(this.topics.keySet()); - for (String topic : this.topics) { + for (String topic : this.topics.keySet()) { partitionInfos.addAll(cluster.partitionsForTopic(topic)); } nodes = cluster.nodes(); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 91697c1..a1bdb42 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -224,7 +224,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { this.metrics = new Metrics(metricConfig, reporters, time); this.partitioner = config.getConfiguredInstance(ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class); long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG); - this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG)); + this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true); this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG); this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG); this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG)); @@ -511,10 +511,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @return The amount of time we waited in ms */ private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { - // add topic to metadata topic list if it is not there already. - if (!this.metadata.containsTopic(topic)) - this.metadata.add(topic); - + // add topic to metadata topic list if it is not there already and reset expiry + this.metadata.add(topic); if (metadata.fetch().partitionsForTopic(topic) != null) return 0; http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index a73d882..fa1e513 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -299,7 +299,7 @@ public final class RecordAccumulator { public ReadyCheckResult ready(Cluster cluster, long nowMs) { Set<Node> readyNodes = new HashSet<>(); long nextReadyCheckDelayMs = Long.MAX_VALUE; - boolean unknownLeadersExist = false; + Set<String> unknownLeaderTopics = new HashSet<>(); boolean exhausted = this.free.queued() > 0; for (Map.Entry<TopicPartition, Deque<RecordBatch>> entry : this.batches.entrySet()) { @@ -307,10 +307,12 @@ public final class RecordAccumulator { Deque<RecordBatch> deque = entry.getValue(); Node leader = cluster.leaderFor(part); - if (leader == null) { - unknownLeadersExist = true; - } else if (!readyNodes.contains(leader) && !muted.contains(part)) { - synchronized (deque) { + synchronized (deque) { + if (leader == null && !deque.isEmpty()) { + // This is a partition for which leader is not known, but messages are available to send. + // Note that entries are currently not removed from batches when deque is empty. + unknownLeaderTopics.add(part.topic()); + } else if (!readyNodes.contains(leader) && !muted.contains(part)) { RecordBatch batch = deque.peekFirst(); if (batch != null) { boolean backingOff = batch.attempts > 0 && batch.lastAttemptMs + retryBackoffMs > nowMs; @@ -333,7 +335,7 @@ public final class RecordAccumulator { } } - return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeadersExist); + return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); } /** @@ -549,12 +551,12 @@ public final class RecordAccumulator { public final static class ReadyCheckResult { public final Set<Node> readyNodes; public final long nextReadyCheckDelayMs; - public final boolean unknownLeadersExist; + public final Set<String> unknownLeaderTopics; - public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, boolean unknownLeadersExist) { + public ReadyCheckResult(Set<Node> readyNodes, long nextReadyCheckDelayMs, Set<String> unknownLeaderTopics) { this.readyNodes = readyNodes; this.nextReadyCheckDelayMs = nextReadyCheckDelayMs; - this.unknownLeadersExist = unknownLeadersExist; + this.unknownLeaderTopics = unknownLeaderTopics; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 29077b6..f1852b5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -175,8 +175,14 @@ public class Sender implements Runnable { RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update - if (result.unknownLeadersExist) + if (!result.unknownLeaderTopics.isEmpty()) { + // The set of topics with unknown leader contains topics with leader election pending as well as + // topics which may have expired. Add the topic again to metadata to ensure it is included + // and request metadata update, since there are messages to send to the topic. + for (String topic : result.unknownLeaderTopics) + this.metadata.add(topic); this.metadata.requestUpdate(); + } // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java index 09a5bee..78b35f8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java @@ -236,11 +236,22 @@ public class MetadataResponse extends AbstractRequestResponse { } /** + * Returns the set of topics with the specified error + */ + public Set<String> topicsByError(Errors error) { + Set<String> errorTopics = new HashSet<>(); + for (TopicMetadata metadata : topicMetadata) { + if (metadata.error == error) + errorTopics.add(metadata.topic()); + } + return errorTopics; + } + + /** * Get a snapshot of the cluster metadata from this response * @return the cluster snapshot */ public Cluster cluster() { - Set<String> unauthorizedTopics = new HashSet<>(); List<PartitionInfo> partitions = new ArrayList<>(); for (TopicMetadata metadata : topicMetadata) { if (metadata.error == Errors.NONE) { @@ -251,12 +262,10 @@ public class MetadataResponse extends AbstractRequestResponse { partitionMetadata.leader, partitionMetadata.replicas.toArray(new Node[0]), partitionMetadata.isr.toArray(new Node[0]))); - } else if (metadata.error == Errors.TOPIC_AUTHORIZATION_FAILED) { - unauthorizedTopics.add(metadata.topic); } } - return new Cluster(this.brokers, partitions, unauthorizedTopics); + return new Cluster(this.brokers, partitions, topicsByError(Errors.TOPIC_AUTHORIZATION_FAILED)); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java index 0493eb2..5defb13 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java @@ -204,6 +204,69 @@ public class MetadataTest { new HashSet<>(Arrays.asList("topic", "topic1")), topics); } + @Test + public void testTopicExpiry() throws Exception { + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, true); + + // Test that topic is expired if not used within the expiry interval + long time = 0; + metadata.add("topic1"); + metadata.update(Cluster.empty(), time); + time += Metadata.TOPIC_EXPIRY_MS; + metadata.update(Cluster.empty(), time); + assertFalse("Unused topic not expired", metadata.containsTopic("topic1")); + + // Test that topic is not expired if used within the expiry interval + metadata.add("topic2"); + metadata.update(Cluster.empty(), time); + for (int i = 0; i < 3; i++) { + time += Metadata.TOPIC_EXPIRY_MS / 2; + metadata.update(Cluster.empty(), time); + assertTrue("Topic expired even though in use", metadata.containsTopic("topic2")); + metadata.add("topic2"); + } + + // Test that topics added using setTopics expire + HashSet<String> topics = new HashSet<>(); + topics.add("topic4"); + metadata.setTopics(topics); + metadata.update(Cluster.empty(), time); + time += Metadata.TOPIC_EXPIRY_MS; + metadata.update(Cluster.empty(), time); + assertFalse("Unused topic not expired", metadata.containsTopic("topic4")); + } + + @Test + public void testNonExpiringMetadata() throws Exception { + metadata = new Metadata(refreshBackoffMs, metadataExpireMs, false); + + // Test that topic is not expired if not used within the expiry interval + long time = 0; + metadata.add("topic1"); + metadata.update(Cluster.empty(), time); + time += Metadata.TOPIC_EXPIRY_MS; + metadata.update(Cluster.empty(), time); + assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic1")); + + // Test that topic is not expired if used within the expiry interval + metadata.add("topic2"); + metadata.update(Cluster.empty(), time); + for (int i = 0; i < 3; i++) { + time += Metadata.TOPIC_EXPIRY_MS / 2; + metadata.update(Cluster.empty(), time); + assertTrue("Topic expired even though in use", metadata.containsTopic("topic2")); + metadata.add("topic2"); + } + + // Test that topics added using setTopics don't expire + HashSet<String> topics = new HashSet<>(); + topics.add("topic4"); + metadata.setTopics(topics); + time += metadataExpireMs * 2; + metadata.update(Cluster.empty(), time); + assertTrue("Unused topic expired when expiry disabled", metadata.containsTopic("topic4")); + } + private Thread asyncFetch(final String topic) { Thread thread = new Thread() { public void run() { http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index fc5c929..040824f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -1113,6 +1113,37 @@ public class ConsumerCoordinatorTest { } } + @Test + public void testMetadataTopicsExpiryDisabled() { + final String consumerId = "consumer"; + + subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + HashSet<String> topics = new HashSet<>(); + topics.add(topicName); + metadata.setTopics(topics); + subscriptions.needReassignment(); + + client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorReady(); + + client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + coordinator.ensurePartitionAssignment(); + + metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds()); + assertTrue("Topic not found in metadata", metadata.containsTopic(topicName)); + time.sleep(Metadata.TOPIC_EXPIRY_MS * 2); + metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds()); + assertTrue("Topic expired", metadata.containsTopic(topicName)); + metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds()); + metadata.update(Cluster.empty(), time.milliseconds()); + assertTrue("Topic expired", metadata.containsTopic(topicName)); + + assertTrue(subscriptions.partitionAssignmentNeeded()); + metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds()); + assertTrue(subscriptions.partitionAssignmentNeeded()); + } + private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors, boolean excludeInternalTopics, http://git-wip-us.apache.org/repos/asf/kafka/blob/0cee0c32/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index fb67747..b8a086b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -13,8 +13,8 @@ package org.apache.kafka.clients.producer.internals; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.util.Collections; @@ -60,7 +60,7 @@ public class SenderTest { private MockTime time = new MockTime(); private MockClient client = new MockClient(time); private int batchSize = 16 * 1024; - private Metadata metadata = new Metadata(0, Long.MAX_VALUE); + private Metadata metadata = new Metadata(0, Long.MAX_VALUE, true); private Cluster cluster = TestUtils.singletonCluster("test", 1); private Metrics metrics = null; private RecordAccumulator accumulator = null; @@ -226,7 +226,42 @@ public class SenderTest { } finally { m.close(); } + } + + /** + * Tests that topics are added to the metadata list when messages are available to send + * and expired if not used during a metadata refresh interval. + */ + @Test + public void testMetadataTopicExpiry() throws Exception { + long offset = 0; + metadata.update(Cluster.empty(), time.milliseconds()); + + Future<RecordMetadata> future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic())); + metadata.update(cluster, time.milliseconds()); + sender.run(time.milliseconds()); // send produce request + client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0)); + sender.run(time.milliseconds()); + assertEquals("Request completed.", 0, client.inFlightRequestCount()); + sender.run(time.milliseconds()); + assertTrue("Request should be completed", future.isDone()); + assertTrue("Topic not retained in metadata list", metadata.containsTopic(tp.topic())); + time.sleep(Metadata.TOPIC_EXPIRY_MS); + metadata.update(Cluster.empty(), time.milliseconds()); + assertFalse("Unused topic has not been expired", metadata.containsTopic(tp.topic())); + future = accumulator.append(tp, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future; + sender.run(time.milliseconds()); + assertTrue("Topic not added to metadata", metadata.containsTopic(tp.topic())); + metadata.update(cluster, time.milliseconds()); + sender.run(time.milliseconds()); // send produce request + client.respond(produceResponse(tp, offset++, Errors.NONE.code(), 0)); + sender.run(time.milliseconds()); + assertEquals("Request completed.", 0, client.inFlightRequestCount()); + sender.run(time.milliseconds()); + assertTrue("Request should be completed", future.isDone()); } private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
