This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new 1b16888 KAFKA-6897; Prevent KafkaProducer.send from blocking when
producer is closed (#5027)
1b16888 is described below
commit 1b16888d462d91a81e130182c8c93552c65d7385
Author: Dhruvil Shah <[email protected]>
AuthorDate: Sat Jul 21 14:03:14 2018 -0700
KAFKA-6897; Prevent KafkaProducer.send from blocking when producer is
closed (#5027)
After successful completion of KafkaProducer#close, it is possible that an
application calls KafkaProducer#send. If the send is invoked for a topic for
which we do not have any metadata, the producer will block until `max.block.ms`
elapses - we do not expect to receive any metadata update in this case because
Sender (and NetworkClient) has already exited. It is only when
RecordAccumulator#append is invoked that we notice that the producer has
already been closed and throw an exception [...]
This patch makes sure `Metadata#awaitUpdate` periodically checks if the
network client has been closed, and if so bails out as soon as possible.
---
.../kafka/clients/ManualMetadataUpdater.java | 4 ++
.../java/org/apache/kafka/clients/Metadata.java | 35 ++++++++++++++--
.../org/apache/kafka/clients/MetadataUpdater.java | 9 +++-
.../org/apache/kafka/clients/NetworkClient.java | 6 +++
.../admin/internals/AdminMetadataManager.java | 4 ++
.../kafka/clients/producer/KafkaProducer.java | 25 +++++++++--
.../kafka/clients/producer/MockProducer.java | 3 --
.../producer/internals/RecordAccumulator.java | 7 ++--
.../org/apache/kafka/clients/MetadataTest.java | 40 +++++++++++++++---
.../java/org/apache/kafka/clients/MockClient.java | 1 +
.../kafka/clients/producer/KafkaProducerTest.java | 48 ++++++++++++++++++++++
.../kafka/clients/producer/MockProducerTest.java | 10 -----
core/src/main/scala/kafka/tools/MirrorMaker.scala | 4 +-
.../kafka/api/BaseProducerSendTest.scala | 4 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 2 +-
15 files changed, 167 insertions(+), 35 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
index 8252cf3..ec007a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ManualMetadataUpdater.java
@@ -89,4 +89,8 @@ public class ManualMetadataUpdater implements MetadataUpdater
{
public void requestUpdate() {
// Do nothing
}
+
+ @Override
+ public void close() {
+ }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index b1da9de..ec07f13 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -25,6 +26,7 @@ import org.apache.kafka.common.errors.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -48,7 +50,7 @@ import java.util.Set;
* is removed from the metadata refresh set after an update. Consumers disable
topic expiry since they explicitly
* manage topics while producers rely on topic expiry to limit the refresh set.
*/
-public final class Metadata {
+public final class Metadata implements Closeable {
private static final Logger log = LoggerFactory.getLogger(Metadata.class);
@@ -70,6 +72,7 @@ public final class Metadata {
private boolean needMetadataForAllTopics;
private final boolean allowAutoTopicCreation;
private final boolean topicExpiryEnabled;
+ private boolean isClosed;
public Metadata(long refreshBackoffMs, long metadataExpireMs, boolean
allowAutoTopicCreation) {
this(refreshBackoffMs, metadataExpireMs, allowAutoTopicCreation,
false, new ClusterResourceListeners());
@@ -100,6 +103,7 @@ public final class Metadata {
this.listeners = new ArrayList<>();
this.clusterResourceListeners = clusterResourceListeners;
this.needMetadataForAllTopics = false;
+ this.isClosed = false;
}
/**
@@ -164,12 +168,12 @@ public final class Metadata {
* Wait for metadata update until the current version is larger than the
last version we know of
*/
public synchronized void awaitUpdate(final int lastVersion, final long
maxWaitMs) throws InterruptedException {
- if (maxWaitMs < 0) {
+ if (maxWaitMs < 0)
throw new IllegalArgumentException("Max time to wait for metadata
updates should not be < 0 milliseconds");
- }
+
long begin = System.currentTimeMillis();
long remainingWaitMs = maxWaitMs;
- while (this.version <= lastVersion) {
+ while ((this.version <= lastVersion) && !isClosed()) {
AuthenticationException ex = getAndClearAuthenticationException();
if (ex != null)
throw ex;
@@ -180,6 +184,8 @@ public final class Metadata {
throw new TimeoutException("Failed to update metadata after "
+ maxWaitMs + " ms.");
remainingWaitMs = maxWaitMs - elapsed;
}
+ if (isClosed())
+ throw new KafkaException("Requested metadata update after close");
}
/**
@@ -224,6 +230,8 @@ public final class Metadata {
*/
public synchronized void update(Cluster newCluster, Set<String>
unavailableTopics, long now) {
Objects.requireNonNull(newCluster, "cluster should not be null");
+ if (isClosed())
+ throw new IllegalStateException("Update requested after metadata
close");
this.needUpdate = false;
this.lastRefreshMs = now;
@@ -332,6 +340,25 @@ public final class Metadata {
}
/**
+ * "Close" this metadata instance to indicate that metadata updates are no
longer possible. This is typically used
+ * when the thread responsible for performing metadata updates is exiting
and needs a way to relay this information
+ * to any other thread(s) that could potentially wait on metadata update
to come through.
+ */
+ @Override
+ public synchronized void close() {
+ this.isClosed = true;
+ this.notifyAll();
+ }
+
+ /**
+ * Check if this metadata instance has been closed. See {@link #close()}
for more information.
+ * @return True if this instance has been closed; false otherwise
+ */
+ public synchronized boolean isClosed() {
+ return this.isClosed;
+ }
+
+ /**
* MetadataUpdate Listener
*/
public interface Listener {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 09ed995..de765db 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
+import java.io.Closeable;
import java.util.List;
/**
@@ -29,7 +30,7 @@ import java.util.List;
* <p>
* This class is not thread-safe!
*/
-public interface MetadataUpdater {
+public interface MetadataUpdater extends Closeable {
/**
* Gets the current cluster info without blocking.
@@ -82,4 +83,10 @@ public interface MetadataUpdater {
* start of the update if possible (see `maybeUpdate` for more
information).
*/
void requestUpdate();
+
+ /**
+ * Close this updater.
+ */
+ @Override
+ void close();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 720a781..fd16fe6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -581,6 +581,7 @@ public class NetworkClient implements KafkaClient {
@Override
public void close() {
this.selector.close();
+ this.metadataUpdater.close();
}
/**
@@ -981,6 +982,11 @@ public class NetworkClient implements KafkaClient {
this.metadata.requestUpdate();
}
+ @Override
+ public void close() {
+ this.metadata.close();
+ }
+
/**
* Return true if there's at least one connection establishment is
currently underway
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 85d3c28..1ad3991 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -118,6 +118,10 @@ public class AdminMetadataManager {
public void requestUpdate() {
AdminMetadataManager.this.requestUpdate();
}
+
+ @Override
+ public void close() {
+ }
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a5af5b6..fec3345 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -789,12 +789,12 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
*
* @throws AuthenticationException if authentication fails. See the
exception for more details
* @throws AuthorizationException fatal error indicating that the producer
is not allowed to write
- * @throws IllegalStateException if a transactional.id has been configured
and no transaction has been started
+ * @throws IllegalStateException if a transactional.id has been configured
and no transaction has been started, or
+ * when send is invoked after producer has
been closed.
* @throws InterruptException If the thread is interrupted while blocked
* @throws SerializationException If the key or value are not valid
objects given the configured serializers
* @throws TimeoutException If the time taken for fetching metadata or
allocating memory for the record has surpassed <code>max.block.ms</code>.
* @throws KafkaException If a Kafka related error occurs that does not
belong to the public API exceptions.
- *
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
@@ -803,14 +803,29 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
return doSend(interceptedRecord, callback);
}
+ // Verify that this producer instance has not been closed. This method
throws IllegalStateException if the producer
+ // has already been closed.
+ private void throwIfProducerClosed() {
+ if (ioThread == null || !ioThread.isAlive())
+ throw new IllegalStateException("Cannot perform operation after
producer has been closed");
+ }
+
/**
* Implementation of asynchronously send a record to a topic.
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record,
Callback callback) {
TopicPartition tp = null;
try {
+ throwIfProducerClosed();
// first make sure the metadata for the topic is available
- ClusterAndWaitTime clusterAndWaitTime =
waitOnMetadata(record.topic(), record.partition(), maxBlockTimeMs);
+ ClusterAndWaitTime clusterAndWaitTime;
+ try {
+ clusterAndWaitTime = waitOnMetadata(record.topic(),
record.partition(), maxBlockTimeMs);
+ } catch (KafkaException e) {
+ if (metadata.isClosed())
+ throw new KafkaException("Producer closed while send in
progress", e);
+ throw e;
+ }
long remainingWaitMs = Math.max(0, maxBlockTimeMs -
clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
byte[] serializedKey;
@@ -895,6 +910,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* @param partition A specific partition expected to exist in metadata, or
null if there's no preference
* @param maxWaitMs The maximum time in ms for waiting on the metadata
* @return The cluster containing topic metadata and the amount of time we
waited in ms
+ * @throws KafkaException for all Kafka-related exceptions, including the
case where this method is called after producer close
*/
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition,
long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and
reset expiry
@@ -1008,8 +1024,9 @@ public class KafkaProducer<K, V> implements Producer<K,
V> {
* Get the partition metadata for the given topic. This can be used for
custom partitioning.
* @throws AuthenticationException if authentication fails. See the
exception for more details
* @throws AuthorizationException if not authorized to the specified
topic. See the exception for more details
- * @throws InterruptException If the thread is interrupted while blocked
+ * @throws InterruptException if the thread is interrupted while blocked
* @throws TimeoutException if metadata could not be refreshed within
{@code max.block.ms}
+ * @throws KafkaException for all Kafka-related exceptions, including the
case where this method is called after producer close
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 9e9869a..dc00b47 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -311,9 +311,6 @@ public class MockProducer<K, V> implements Producer<K, V> {
@Override
public void close(long timeout, TimeUnit timeUnit) {
- if (this.closed) {
- throw new IllegalStateException("MockProducer is already closed.");
- }
this.closed = true;
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index e2b5844..31c6d75 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -195,7 +196,7 @@ public final class RecordAccumulator {
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
- throw new IllegalStateException("Cannot send after the
producer is closed.");
+ throw new KafkaException("Producer closed while send in
progress");
RecordAppendResult appendResult = tryAppend(timestamp, key,
value, headers, callback, dq);
if (appendResult != null)
return appendResult;
@@ -209,7 +210,7 @@ public final class RecordAccumulator {
synchronized (dq) {
// Need to check if producer is closed again after grabbing
the dequeue lock.
if (closed)
- throw new IllegalStateException("Cannot send after the
producer is closed.");
+ throw new KafkaException("Producer closed while send in
progress");
RecordAppendResult appendResult = tryAppend(timestamp, key,
value, headers, callback, dq);
if (appendResult != null) {
@@ -700,7 +701,7 @@ public final class RecordAccumulator {
* Go through incomplete batches and abort them.
*/
private void abortBatches() {
- abortBatches(new IllegalStateException("Producer is closed
forcefully."));
+ abortBatches(new KafkaException("Producer is closed forcefully."));
}
/**
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 1188af7..969921e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
@@ -46,7 +47,7 @@ public class MetadataTest {
private long refreshBackoffMs = 100;
private long metadataExpireMs = 1000;
private Metadata metadata = new Metadata(refreshBackoffMs,
metadataExpireMs, true);
- private AtomicReference<String> backgroundError = new AtomicReference<>();
+ private AtomicReference<Exception> backgroundError = new
AtomicReference<>();
@After
public void tearDown() {
@@ -83,6 +84,30 @@ public class MetadataTest {
assertTrue("Update needed due to stale metadata.",
metadata.timeToNextUpdate(time) == 0);
}
+ @Test
+ public void testMetadataAwaitAfterClose() throws InterruptedException {
+ long time = 0;
+ metadata.update(Cluster.empty(), Collections.<String>emptySet(), time);
+ assertFalse("No update needed.", metadata.timeToNextUpdate(time) == 0);
+ metadata.requestUpdate();
+ assertFalse("Still no updated needed due to backoff",
metadata.timeToNextUpdate(time) == 0);
+ time += refreshBackoffMs;
+ assertTrue("Update needed now that backoff time expired",
metadata.timeToNextUpdate(time) == 0);
+ String topic = "my-topic";
+ metadata.close();
+ Thread t1 = asyncFetch(topic, 500);
+ t1.join();
+ assertTrue(backgroundError.get().getClass() == KafkaException.class);
+ assertTrue(backgroundError.get().toString().contains("Requested
metadata update after close"));
+ clearBackgroundError();
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testMetadataUpdateAfterClose() {
+ metadata.close();
+ metadata.update(Cluster.empty(), Collections.<String>emptySet(), 1000);
+ }
+
private static void checkTimeToNextUpdate(long refreshBackoffMs, long
metadataExpireMs) {
long now = 10000;
@@ -409,15 +434,18 @@ public class MetadataTest {
assertTrue("Unused topic expired when expiry disabled",
metadata.containsTopic("topic4"));
}
+ private void clearBackgroundError() {
+ backgroundError.set(null);
+ }
+
private Thread asyncFetch(final String topic, final long maxWaitMs) {
Thread thread = new Thread() {
public void run() {
- while (metadata.fetch().partitionsForTopic(topic).isEmpty()) {
- try {
+ try {
+ while
(metadata.fetch().partitionsForTopic(topic).isEmpty())
metadata.awaitUpdate(metadata.requestUpdate(),
maxWaitMs);
- } catch (Exception e) {
- backgroundError.set(e.toString());
- }
+ } catch (Exception e) {
+ backgroundError.set(e);
}
}
};
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 0f64f13..6b41a9e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -533,6 +533,7 @@ public class MockClient implements KafkaClient {
@Override
public void close() {
+ metadata.close();
}
@Override
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 8bfc5e7..ad72a9e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -609,4 +609,52 @@ public class KafkaProducerTest {
producer.close(0, TimeUnit.MILLISECONDS);
}
}
+
+ @Test
+ public void testCloseWhenWaitingForMetadataUpdate() throws
InterruptedException {
+ Properties props = new Properties();
+ props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MAX_VALUE);
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+
+ // Simulate a case where metadata for a particular topic is not
available. This will cause KafkaProducer#send to
+ // block in Metadata#awaitUpdate for the configured max.block.ms. When
close() is invoked, KafkaProducer#send should
+ // return with a KafkaException.
+ String topicName = "test";
+ Time time = new MockTime();
+ Cluster cluster = TestUtils.singletonCluster();
+ Node node = cluster.nodes().get(0);
+ Metadata metadata = new Metadata(0, Long.MAX_VALUE, false);
+ metadata.update(cluster, Collections.<String>emptySet(),
time.milliseconds());
+ MockClient client = new MockClient(time, metadata);
+ client.setNode(node);
+
+ Producer<String, String> producer = new KafkaProducer<>(
+ new ProducerConfig(ProducerConfig.addSerializerToConfig(props,
new StringSerializer(), new StringSerializer())),
+ new StringSerializer(), new StringSerializer(), metadata,
client);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ final AtomicReference<Exception> sendException = new
AtomicReference<>();
+
+ try {
+ executor.submit(() -> {
+ try {
+ // Metadata for topic "test" will not be available which
will cause us to block indefinitely until
+ // KafkaProducer#close is invoked.
+ producer.send(new ProducerRecord<>(topicName, "key",
"value"));
+ fail();
+ } catch (Exception e) {
+ sendException.set(e);
+ }
+ });
+
+ // Wait until metadata update for the topic has been requested
+ TestUtils.waitForCondition(() ->
metadata.containsTopic(topicName), "Timeout when waiting for topic to be added
to metadata");
+ producer.close(0, TimeUnit.MILLISECONDS);
+ TestUtils.waitForCondition(() -> sendException.get() != null, "No
producer exception within timeout");
+ assertEquals(KafkaException.class, sendException.get().getClass());
+ } finally {
+ executor.shutdownNow();
+ }
+ }
+
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 27fac28..7a8c710 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -637,16 +637,6 @@ public class MockProducerTest {
}
@Test
- public void shouldThrowOnCloseIfProducerIsClosed() {
- buildMockProducer(true);
- producer.close();
- try {
- producer.close();
- fail("Should have thrown as producer is already closed");
- } catch (IllegalStateException e) { }
- }
-
- @Test
public void shouldThrowOnFenceProducerIfProducerIsClosed() {
buildMockProducer(true);
producer.close();
diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala
b/core/src/main/scala/kafka/tools/MirrorMaker.scala
index 92396a7..9cc6ebe 100755
--- a/core/src/main/scala/kafka/tools/MirrorMaker.scala
+++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala
@@ -31,7 +31,7 @@ import kafka.utils.{CommandLineUtils, CoreUtils, Logging,
Whitelist}
import org.apache.kafka.clients.consumer.{CommitFailedException, Consumer,
ConsumerConfig, ConsumerRebalanceListener, ConsumerRecord, KafkaConsumer,
OffsetAndMetadata}
import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord, RecordMetadata}
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.errors.WakeupException
@@ -357,6 +357,8 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
trace("Caught NoRecordsException, continue iteration.")
case _: WakeupException =>
trace("Caught WakeupException, continue iteration.")
+ case e: KafkaException if (shuttingDown || exitingOnSendFailure) =>
+ trace(s"Ignoring caught KafkaException during shutdown.
sendFailure: $exitingOnSendFailure.", e)
}
maybeFlushAndCommitOffsets()
}
diff --git
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index ee0e90f..dc4041f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -35,6 +35,7 @@ import org.junit.Assert._
import org.junit.{After, Before, Test}
import scala.collection.mutable.{ArrayBuffer, Buffer}
+import scala.concurrent.ExecutionException
abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@@ -446,8 +447,7 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
future.get()
fail("No message should be sent successfully.")
} catch {
- case e: Exception =>
- assertEquals("java.lang.IllegalStateException: Producer is closed
forcefully.", e.getMessage)
+ case e: ExecutionException => assertEquals(classOf[KafkaException],
e.getCause.getClass)
}
}
assertEquals("Fetch response should have no message returned.", 0,
consumer.poll(50).count)
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index 7969485..9b77c2d 100644
---
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -205,7 +205,7 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
// create topic
createTopic(topic1, replicationFactor = numServers)
- val record = new ProducerRecord[Array[Byte],Array[Byte]](topic1, null,
"key".getBytes, "value".getBytes)
+ val record = new ProducerRecord[Array[Byte], Array[Byte]](topic1, null,
"key".getBytes, "value".getBytes)
// first send a message to make sure the metadata is refreshed
producer1.send(record).get