This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 31a9f6a KAFKA-9310: Handle UnknownProducerId from
RecordCollector.send (#7845)
31a9f6a is described below
commit 31a9f6add18e8489597f4ba159f77b2d2bd0531a
Author: John Roesler <[email protected]>
AuthorDate: Sat Dec 21 12:30:36 2019 -0600
KAFKA-9310: Handle UnknownProducerId from RecordCollector.send (#7845)
Reviewers: Matthias J. Sax <[email protected]>
---
.../processor/internals/RecordCollectorImpl.java | 16 ++--
.../processor/internals/RecordCollectorTest.java | 90 +++++++++++++++++++---
2 files changed, 91 insertions(+), 15 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index d72115e..ea573f2 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.processor.internals;
-import java.util.Collections;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
@@ -35,8 +34,8 @@ import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
@@ -45,6 +44,7 @@ import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.slf4j.Logger;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -236,12 +236,10 @@ public class RecordCollectorImpl implements
RecordCollector {
e
);
} catch (final RuntimeException uncaughtException) {
- if (uncaughtException instanceof KafkaException &&
- uncaughtException.getCause() instanceof
ProducerFencedException) {
- final KafkaException kafkaException = (KafkaException)
uncaughtException;
+ if (isRecoverable(uncaughtException)) {
// producer.send() call may throw a KafkaException which wraps
a FencedException,
// in this case we should throw its wrapped inner cause so
that it can be captured and re-wrapped as TaskMigrationException
- throw new RecoverableClientException("Caught a wrapped
ProducerFencedException", kafkaException);
+ throw new RecoverableClientException("Caught a recoverable
exception", uncaughtException);
} else {
throw new StreamsException(
String.format(
@@ -257,6 +255,12 @@ public class RecordCollectorImpl implements
RecordCollector {
}
}
+ public static boolean isRecoverable(final RuntimeException
uncaughtException) {
+ return uncaughtException instanceof KafkaException && (
+ uncaughtException.getCause() instanceof ProducerFencedException ||
+ uncaughtException.getCause() instanceof
UnknownProducerIdException);
+ }
+
private void checkForException() {
if (sendException != null) {
throw sendException;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 72d315f..7bc616d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
@@ -53,6 +54,7 @@ import java.util.concurrent.Future;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -174,42 +176,112 @@ public class RecordCollectorTest {
assertThat(collector.offsets().get(topicPartition), equalTo(2L));
}
- @SuppressWarnings("unchecked")
- @Test(expected = StreamsException.class)
+ @Test
public void
shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {
final RecordCollector collector = new RecordCollectorImpl(
"test",
logContext,
new DefaultProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.init(new MockProducer(cluster, true, new
DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ collector.init(new MockProducer<byte[], byte[]>(cluster, true, new
DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
- public synchronized Future<RecordMetadata> send(final
ProducerRecord record, final Callback callback) {
+ public synchronized Future<RecordMetadata> send(final
ProducerRecord<byte[], byte[]> record, final Callback callback) {
throw new KafkaException();
}
});
- collector.send("topic1", "3", "0", null, null, stringSerializer,
stringSerializer, streamPartitioner);
+ final StreamsException thrown = assertThrows(StreamsException.class,
() ->
+ collector.send("topic1", "3", "0", null, null, stringSerializer,
stringSerializer, streamPartitioner)
+ );
+ assertThat(thrown.getCause(), instanceOf(KafkaException.class));
}
- @SuppressWarnings("unchecked")
- @Test(expected = RecoverableClientException.class)
+ @Test
+ public void shouldThrowRecoverableExceptionOnProducerFencedException() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ "test",
+ logContext,
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("dropped-records")
+ );
+ collector.init(new MockProducer<byte[], byte[]>(cluster, true, new
DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public synchronized Future<RecordMetadata> send(final
ProducerRecord<byte[], byte[]> record, final Callback callback) {
+ throw new KafkaException(new ProducerFencedException("asdf"));
+ }
+ });
+
+ final RecoverableClientException thrown =
assertThrows(RecoverableClientException.class, () ->
+ collector.send("topic1", "3", "0", null, null, stringSerializer,
stringSerializer, streamPartitioner)
+ );
+ assertThat(thrown.getCause(), instanceOf(KafkaException.class));
+ assertThat(thrown.getCause().getCause(),
instanceOf(ProducerFencedException.class));
+ }
+
+ @Test
+ public void shouldThrowRecoverableExceptionOnUnknownProducerException() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ "test",
+ logContext,
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("dropped-records")
+ );
+ collector.init(new MockProducer<byte[], byte[]>(cluster, true, new
DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public synchronized Future<RecordMetadata> send(final
ProducerRecord<byte[], byte[]> record, final Callback callback) {
+ throw new KafkaException(new
UnknownProducerIdException("asdf"));
+ }
+ });
+
+ final RecoverableClientException thrown =
assertThrows(RecoverableClientException.class, () ->
+ collector.send("topic1", "3", "0", null, null, stringSerializer,
stringSerializer, streamPartitioner)
+ );
+ assertThat(thrown.getCause(), instanceOf(KafkaException.class));
+ assertThat(thrown.getCause().getCause(),
instanceOf(UnknownProducerIdException.class));
+ }
+
+ @Test
public void shouldThrowRecoverableExceptionWhenProducerFencedInCallback() {
final RecordCollector collector = new RecordCollectorImpl(
"test",
logContext,
new DefaultProductionExceptionHandler(),
new Metrics().sensor("skipped-records"));
- collector.init(new MockProducer(cluster, true, new
DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ collector.init(new MockProducer<byte[], byte[]>(cluster, true, new
DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
@Override
- public synchronized Future<RecordMetadata> send(final
ProducerRecord record, final Callback callback) {
+ public synchronized Future<RecordMetadata> send(final
ProducerRecord<byte[], byte[]> record, final Callback callback) {
callback.onCompletion(null, new
ProducerFencedException("asdf"));
return null;
}
});
collector.send("topic1", "3", "0", null, null, stringSerializer,
stringSerializer, streamPartitioner);
+ final RecoverableClientException thrown =
assertThrows(RecoverableClientException.class, () ->
+ collector.send("topic1", "3", "0", null, null, stringSerializer,
stringSerializer, streamPartitioner)
+ );
+ assertThat(thrown.getCause(),
instanceOf(ProducerFencedException.class));
+ }
+
+ @Test
+ public void
shouldThrowRecoverableExceptionWhenProducerForgottenInCallback() {
+ final RecordCollector collector = new RecordCollectorImpl(
+ "test",
+ logContext,
+ new DefaultProductionExceptionHandler(),
+ new Metrics().sensor("skipped-records"));
+ collector.init(new MockProducer<byte[], byte[]>(cluster, true, new
DefaultPartitioner(), byteArraySerializer, byteArraySerializer) {
+ @Override
+ public synchronized Future<RecordMetadata> send(final
ProducerRecord<byte[], byte[]> record, final Callback callback) {
+ callback.onCompletion(null, new
UnknownProducerIdException("asdf"));
+ return null;
+ }
+ });
+
collector.send("topic1", "3", "0", null, null, stringSerializer,
stringSerializer, streamPartitioner);
+ final RecoverableClientException thrown =
assertThrows(RecoverableClientException.class, () ->
+ collector.send("topic1", "3", "0", null, null, stringSerializer,
stringSerializer, streamPartitioner)
+ );
+ assertThat(thrown.getCause(),
instanceOf(UnknownProducerIdException.class));
}
@SuppressWarnings("unchecked")