This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b9ea764566 KAFKA-17057: Add RETRY option to
ProductionExceptionHanlder (#17163)
6b9ea764566 is described below
commit 6b9ea76456646b8f1cfcbb43936c31b846840444
Author: Matthias J. Sax <[email protected]>
AuthorDate: Wed Sep 25 20:06:16 2024 -0700
KAFKA-17057: Add RETRY option to ProductionExceptionHanlder (#17163)
Implements KIP-1065
Reviewers: Alieh Saeedi <[email protected]>, Bill Bejeck
<[email protected]>
---
.../errors/DefaultProductionExceptionHandler.java | 11 +-
.../streams/errors/ProductionExceptionHandler.java | 33 ++++--
.../processor/internals/RecordCollectorImpl.java | 65 ++++++------
...> SwallowUnknownTopicErrorIntegrationTest.java} | 111 ++++++++++++---------
.../processor/internals/RecordCollectorTest.java | 71 ++++++++++++-
5 files changed, 194 insertions(+), 97 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
index 0896114cf28..d6cc8e915e7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.errors;
import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.RetriableException;
import java.util.Map;
@@ -29,18 +30,22 @@ public class DefaultProductionExceptionHandler implements
ProductionExceptionHan
@Override
public ProductionExceptionHandlerResponse handle(final
ProducerRecord<byte[], byte[]> record,
final Exception
exception) {
- return ProductionExceptionHandlerResponse.FAIL;
+ return exception instanceof RetriableException ?
+ ProductionExceptionHandlerResponse.RETRY :
+ ProductionExceptionHandlerResponse.FAIL;
}
@Override
public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext
context,
final
ProducerRecord<byte[], byte[]> record,
final Exception
exception) {
- return ProductionExceptionHandlerResponse.FAIL;
+ return exception instanceof RetriableException ?
+ ProductionExceptionHandlerResponse.RETRY :
+ ProductionExceptionHandlerResponse.FAIL;
}
@Override
public void configure(final Map<String, ?> configs) {
// ignore
}
-}
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
index 02837b9dd80..95127887b36 100644
---
a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
+++
b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
@@ -83,18 +83,37 @@ public interface ProductionExceptionHandler extends
Configurable {
}
enum ProductionExceptionHandlerResponse {
- /* continue processing */
+ /** Continue processing.
+ *
+ * <p> For this case, output records which could not be written
successfully are lost.
+ * Use this option only if you can tolerate data loss.
+ */
CONTINUE(0, "CONTINUE"),
- /* fail processing */
- FAIL(1, "FAIL");
+ /** Fail processing.
+ *
+ * <p> Kafka Streams will raise an exception and the {@code
StreamsThread} will fail.
+ * No offsets (for {@link
org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or
transactions
+ * (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2
exactly-once}) will be committed.
+ */
+ FAIL(1, "FAIL"),
+ /** Retry the failed operation.
+ *
+ * <p> Retrying might imply that a {@link TaskCorruptedException}
exception is thrown, and that the retry
+ * is started from the last committed offset.
+ *
+ * <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for
+ * {@link org.apache.kafka.common.errors.RetriableException retriable
exceptions}.
+ * If {@code RETRY} is returned for a non-retriable exception it will
be interpreted as {@link #FAIL}.
+ */
+ RETRY(2, "RETRY");
/**
- * an english description of the api--this is for debugging and can
change
+ * An english description for the used option. This is for debugging
only and may change.
*/
public final String name;
/**
- * the permanent and immutable id of an API--this can't change ever
+ * The permanent and immutable id for the used option. This can't
change ever.
*/
public final int id;
@@ -106,9 +125,9 @@ public interface ProductionExceptionHandler extends
Configurable {
}
enum SerializationExceptionOrigin {
- /* serialization exception occurred during serialization of the key */
+ /** Serialization exception occurred during serialization of the key.
*/
KEY,
- /* serialization exception occurred during serialization of the value
*/
+ /** Serialization exception occurred during serialization of the
value. */
VALUE
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index bd589a8b7a1..a79e4073b60 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -34,7 +34,6 @@ import
org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Sensor;
@@ -160,7 +159,7 @@ public class RecordCollectorImpl implements RecordCollector
{
fatal
);
}
- if (partitions.size() > 0) {
+ if (!partitions.isEmpty()) {
final Optional<Set<Integer>> maybeMulticastPartitions =
partitioner.partitions(topic, key, value, partitions.size());
if (!maybeMulticastPartitions.isPresent()) {
// A null//empty partition indicates we should use the
default partitioner
@@ -342,7 +341,7 @@ public class RecordCollectorImpl implements RecordCollector
{
throw new FailedProcessingException("Fatal user code error in
production error callback", fatalUserException);
}
- if (response == ProductionExceptionHandlerResponse.FAIL) {
+ if (maybeFailResponse(response) ==
ProductionExceptionHandlerResponse.FAIL) {
throw new StreamsException(
String.format(
"Unable to serialize record. ProducerRecord(topic=[%s],
partition=[%d], timestamp=[%d]",
@@ -430,55 +429,53 @@ public class RecordCollectorImpl implements
RecordCollector {
"indicating the task may be migrated out";
sendException.set(new TaskMigratedException(errorMessage,
productionException));
} else {
- if (isRetriable(productionException)) {
+ final ProductionExceptionHandlerResponse response;
+ try {
+ response = Objects.requireNonNull(
+ productionExceptionHandler.handle(
+ errorHandlerContext(context, processorNodeId),
+ serializedRecord,
+ productionException
+ ),
+ "Invalid ProductionExceptionHandler response."
+ );
+ } catch (final RuntimeException fatalUserException) {
+ log.error(
+ "Production error callback failed after production error
for record {}",
+ serializedRecord,
+ productionException
+ );
+ sendException.set(new FailedProcessingException("Fatal user
code error in production error callback", fatalUserException));
+ return;
+ }
+
+ if (productionException instanceof RetriableException && response
== ProductionExceptionHandlerResponse.RETRY) {
errorMessage += "\nThe broker is either slow or in bad state
(like not having enough replicas) in responding the request, " +
"or the connection to broker was interrupted sending the
request or receiving the response. " +
"\nConsider overwriting `max.block.ms` and /or " +
"`delivery.timeout.ms` to a larger value to wait longer
for such scenarios and avoid timeout errors";
sendException.set(new
TaskCorruptedException(Collections.singleton(taskId)));
} else {
- final ProductionExceptionHandlerResponse response;
- try {
- response = Objects.requireNonNull(
- productionExceptionHandler.handle(
- errorHandlerContext(context, processorNodeId),
- serializedRecord,
- productionException
- ),
- "Invalid ProductionExceptionHandler response."
- );
- } catch (final RuntimeException fatalUserException) {
- log.error(
- "Production error callback failed after production
error for record {}",
- serializedRecord,
- productionException
- );
- sendException.set(new FailedProcessingException("Fatal
user code error in production error callback", fatalUserException));
- return;
- }
-
- if (response == ProductionExceptionHandlerResponse.FAIL) {
+ if (maybeFailResponse(response) ==
ProductionExceptionHandlerResponse.FAIL) {
errorMessage += "\nException handler choose to FAIL the
processing, no more records would be sent.";
sendException.set(new StreamsException(errorMessage,
productionException));
} else {
errorMessage += "\nException handler choose to CONTINUE
processing in spite of this error but written offsets would not be recorded.";
droppedRecordsSensor.record();
}
-
}
}
log.error(errorMessage, productionException);
}
- /**
- * The `TimeoutException` with root cause
`UnknownTopicOrPartitionException` is considered as non-retriable
- * (despite `TimeoutException` being a subclass of `RetriableException`,
this particular case is explicitly excluded).
- */
- private boolean isRetriable(final Exception exception) {
- return exception instanceof RetriableException &&
- (!(exception instanceof TimeoutException) ||
exception.getCause() == null
- || !(exception.getCause() instanceof
UnknownTopicOrPartitionException));
+ private ProductionExceptionHandlerResponse maybeFailResponse(final
ProductionExceptionHandlerResponse response) {
+ if (response == ProductionExceptionHandlerResponse.RETRY) {
+ log.warn("ProductionExceptionHandler returned RETRY for a
non-retriable exception. Will treat it as FAIL.");
+ return ProductionExceptionHandlerResponse.FAIL;
+ } else {
+ return response;
+ }
}
private boolean isFatalException(final Exception exception) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
similarity index 65%
rename from
streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
rename to
streams/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
index 873b2eb922d..25f9afd5428 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/CustomHandlerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/SwallowUnknownTopicErrorIntegrationTest.java
@@ -16,24 +16,28 @@
*/
package org.apache.kafka.streams.integration;
-import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
+import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
@@ -48,16 +52,14 @@ import org.junit.jupiter.api.Timeout;
import java.io.IOException;
import java.util.Collections;
+import java.util.Map;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
-import static org.junit.jupiter.api.Assertions.assertInstanceOf;
-
@Timeout(600)
@Tag("integration")
-public class CustomHandlerIntegrationTest {
+public class SwallowUnknownTopicErrorIntegrationTest {
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(NUM_BROKERS,
Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable",
"false")));
@@ -77,8 +79,7 @@ public class CustomHandlerIntegrationTest {
// topic name
private static final String STREAM_INPUT = "STREAM_INPUT";
private static final String NON_EXISTING_TOPIC = "non_existing_topic";
-
- private final AtomicReference<Throwable> caughtException = new
AtomicReference<>();
+ private static final String STREAM_OUTPUT = "STREAM_OUTPUT";
private KafkaStreams kafkaStreams;
private Topology topology;
@@ -87,19 +88,19 @@ public class CustomHandlerIntegrationTest {
@BeforeEach
public void before(final TestInfo testInfo) throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder();
- CLUSTER.createTopics(STREAM_INPUT);
+ CLUSTER.createTopics(STREAM_INPUT, STREAM_OUTPUT);
final String safeTestName = safeUniqueTestName(testInfo);
appId = "app-" + safeTestName;
- builder.stream(STREAM_INPUT, Consumed.with(Serdes.Integer(),
Serdes.String()))
- .to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(),
Serdes.String()));
- produceRecords();
+ final KStream<Integer, String> stream = builder.stream(STREAM_INPUT,
Consumed.with(Serdes.Integer(), Serdes.String()));
+ stream.to(NON_EXISTING_TOPIC, Produced.with(Serdes.Integer(),
Serdes.String()));
+ stream.to(STREAM_OUTPUT, Produced.with(Serdes.Integer(),
Serdes.String()));
topology = builder.build();
}
@AfterEach
public void after() throws InterruptedException {
- CLUSTER.deleteTopics(STREAM_INPUT);
+ CLUSTER.deleteTopics(STREAM_INPUT, STREAM_OUTPUT);
if (kafkaStreams != null) {
kafkaStreams.close();
kafkaStreams.cleanUp();
@@ -108,15 +109,30 @@ public class CustomHandlerIntegrationTest {
private void produceRecords() {
final Properties props = TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- IntegerSerializer.class,
- StringSerializer.class,
- new Properties());
+ CLUSTER.bootstrapServers(),
+ IntegerSerializer.class,
+ StringSerializer.class
+ );
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
- STREAM_INPUT,
- Collections.singletonList(new KeyValue<>(1, "A")),
- props,
- CLUSTER.time.milliseconds() + 2
+ STREAM_INPUT,
+ Collections.singletonList(new KeyValue<>(1, "A")),
+ props,
+ CLUSTER.time.milliseconds() + 2
+ );
+ }
+
+ private void verifyResult() {
+ final Properties props = TestUtils.consumerConfig(
+ CLUSTER.bootstrapServers(),
+ "consumer",
+ IntegerDeserializer.class,
+ StringDeserializer.class
+ );
+
+ IntegrationTestUtils.verifyKeyValueTimestamps(
+ props,
+ STREAM_OUTPUT,
+ Collections.singletonList(new KeyValueTimestamp<>(1, "A",
CLUSTER.time.milliseconds() + 2))
);
}
@@ -124,12 +140,33 @@ public class CustomHandlerIntegrationTest {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.IntegerSerde.class);
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.StringSerde.class);
- streamsConfiguration.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10_000);
+
streamsConfiguration.put(StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG,
TestHandler.class);
return streamsConfiguration;
}
+ public static class TestHandler implements ProductionExceptionHandler {
+
+ public TestHandler() { }
+
+ @Override
+ public void configure(final Map<String, ?> configs) { }
+
+ @Override
+ public ProductionExceptionHandlerResponse handle(final
ErrorHandlerContext context,
+ final
ProducerRecord<byte[], byte[]> record,
+ final Exception
exception) {
+ if (exception instanceof TimeoutException &&
+ exception.getCause() != null &&
+ exception.getCause() instanceof
UnknownTopicOrPartitionException) {
+ return ProductionExceptionHandlerResponse.CONTINUE;
+ }
+ return ProductionExceptionHandler.super.handle(context, record,
exception);
+ }
+ }
+
private void closeApplication(final Properties streamsConfiguration)
throws Exception {
kafkaStreams.close();
kafkaStreams.cleanUp();
@@ -140,10 +177,6 @@ public class CustomHandlerIntegrationTest {
public void
shouldThrowStreamsExceptionWithMissingTopicAndDefaultExceptionHandler() throws
Exception {
final Properties streamsConfiguration = getCommonProperties();
kafkaStreams = new KafkaStreams(topology, streamsConfiguration);
- kafkaStreams.setUncaughtExceptionHandler(e -> {
- caughtException.set(e);
- return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
- });
kafkaStreams.start();
TestUtils.waitForCondition(
() -> kafkaStreams.state() == State.RUNNING,
@@ -151,29 +184,9 @@ public class CustomHandlerIntegrationTest {
() -> "Kafka Streams application did not reach state RUNNING in "
+ timeoutMs + " ms"
);
- TestUtils.waitForCondition(
- this::receivedUnknownTopicOrPartitionException,
- timeoutMs,
- () -> "Did not receive UnknownTopicOrPartitionException"
- );
+ produceRecords();
+ verifyResult();
- TestUtils.waitForCondition(
- () -> kafkaStreams.state() == State.ERROR,
- timeoutMs,
- () -> "Kafka Streams application did not reach state ERROR in " +
timeoutMs + " ms"
- );
closeApplication(streamsConfiguration);
}
-
- private boolean receivedUnknownTopicOrPartitionException() {
- if (caughtException.get() == null) {
- return false;
- }
-
- assertInstanceOf(StreamsException.class, caughtException.get());
- assertInstanceOf(TimeoutException.class,
caughtException.get().getCause());
- assertInstanceOf(UnknownTopicOrPartitionException.class,
caughtException.get().getCause().getCause());
-
- return true;
- }
}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index dc8e6682b46..353289fc72b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -54,6 +54,7 @@ import
org.apache.kafka.streams.errors.ProductionExceptionHandler;
import
org.apache.kafka.streams.errors.ProductionExceptionHandler.ProductionExceptionHandlerResponse;
import
org.apache.kafka.streams.errors.ProductionExceptionHandler.SerializationExceptionOrigin;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
@@ -1346,15 +1347,40 @@ public class RecordCollectorTest {
collector.send(topic, "3", "0", null, null, stringSerializer,
stringSerializer, sinkNodeName, context, streamPartitioner);
- // With default handler which returns FAIL, flush() throws
StreamsException with TimeoutException cause,
- // otherwise it would throw a TaskCorruptedException with null cause
+ final TaskCorruptedException thrown =
assertThrows(TaskCorruptedException.class, collector::flush);
+ assertThat(
+ thrown.getMessage(),
+ equalTo("Tasks [0_0] are corrupted and hence need to be
re-initialized")
+ );
+ }
+
+ @Test
+ public void
shouldThrowStreamsExceptionOnUnknownTopicOrPartitionExceptionWhenExceptionHandlerReturnsFail()
{
+ final KafkaException exception = new TimeoutException("KABOOM!", new
UnknownTopicOrPartitionException());
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ getExceptionalStreamsProducerOnSend(exception),
+ new ProductionExceptionHandlerMock(
+ Optional.of(ProductionExceptionHandlerResponse.FAIL),
+ context,
+ sinkNodeName,
+ taskId
+ ),
+ streamsMetrics,
+ topology
+ );
+
+ collector.send(topic, "3", "0", null, null, stringSerializer,
stringSerializer, sinkNodeName, context, streamPartitioner);
+
+ // With custom handler which returns FAIL, flush() throws
StreamsException with TimeoutException cause
final StreamsException thrown = assertThrows(StreamsException.class,
collector::flush);
assertEquals(exception, thrown.getCause());
assertThat(
thrown.getMessage(),
equalTo("Error encountered sending record to topic topic for task
0_0 due to:" +
- "\norg.apache.kafka.common.errors.TimeoutException:
KABOOM!" +
- "\nException handler choose to FAIL the processing, no
more records would be sent.")
+ "\norg.apache.kafka.common.errors.TimeoutException: KABOOM!" +
+ "\nException handler choose to FAIL the processing, no more
records would be sent.")
);
}
@@ -1380,6 +1406,42 @@ public class RecordCollectorTest {
assertDoesNotThrow(collector::flush);
}
+ @Test
+ public void shouldTreatRetryAsFailForNonRetriableException() {
+ try (final LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister(RecordCollectorImpl.class)) {
+ final RuntimeException exception = new RuntimeException("KABOOM!");
+ final RecordCollector collector = new RecordCollectorImpl(
+ logContext,
+ taskId,
+ getExceptionalStreamsProducerOnSend(exception),
+ new ProductionExceptionHandlerMock(
+ Optional.of(ProductionExceptionHandlerResponse.RETRY),
+ context,
+ sinkNodeName,
+ taskId
+ ),
+ streamsMetrics,
+ topology
+ );
+
+ collector.send(topic, "3", "0", null, null, stringSerializer,
stringSerializer, sinkNodeName, context, streamPartitioner);
+
+ final StreamsException thrown =
assertThrows(StreamsException.class, collector::flush);
+ assertEquals(exception, thrown.getCause());
+ assertThat(
+ thrown.getMessage(),
+ equalTo("Error encountered sending record to topic topic for
task 0_0 due to:" +
+ "\njava.lang.RuntimeException: KABOOM!" +
+ "\nException handler choose to FAIL the processing, no
more records would be sent.")
+ );
+
+ assertThat(
+ logCaptureAppender.getMessages().get(0),
+ equalTo("test ProductionExceptionHandler returned RETRY for a
non-retriable exception. Will treat it as FAIL.")
+ );
+ }
+ }
+
@Test
public void shouldNotAbortTxnOnEOSCloseDirtyIfNothingSent() {
final AtomicBoolean functionCalled = new AtomicBoolean(false);
@@ -1986,6 +2048,7 @@ public class RecordCollectorTest {
return response.orElse(null);
}
+ @SuppressWarnings("rawtypes")
@Override
public ProductionExceptionHandlerResponse
handleSerializationException(final ErrorHandlerContext context,
final ProducerRecord record,