This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new c8dc09c2659 KAFKA-16448: Handle fatal user exception during processing
error (#16675)
c8dc09c2659 is described below
commit c8dc09c2659bb6309d97c692c907c076898b4aeb
Author: Sebastien Viale <[email protected]>
AuthorDate: Wed Jul 31 07:58:07 2024 +0200
KAFKA-16448: Handle fatal user exception during processing error (#16675)
This PR is part of KIP-1033 which aims to bring a
ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions
that occur during processing.
This PR catch the exceptions thrown while handling a processing exception
Co-authored-by: Dabz <[email protected]>
Co-authored-by: loicgreffier <[email protected]>
Reviewers: Bruno Cadonna <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/processor/internals/ProcessorNode.java | 8 +-
.../ProcessingExceptionHandlerIntegrationTest.java | 146 +++++++++++++++++++++
.../processor/internals/ProcessorNodeTest.java | 31 ++++-
3 files changed, 179 insertions(+), 6 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 763edc9a045..175c9e104ef 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -213,9 +213,13 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId());
- final ProcessingExceptionHandler.ProcessingHandlerResponse
response = processingExceptionHandler
- .handle(errorHandlerContext, record, e);
+ final ProcessingExceptionHandler.ProcessingHandlerResponse
response;
+ try {
+ response =
processingExceptionHandler.handle(errorHandlerContext, record, e);
+ } catch (final Exception fatalUserException) {
+ throw new FailedProcessingException(fatalUserException);
+ }
if (response ==
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
log.error("Processing exception handler is set to fail upon" +
" a processing error. If you would rather have the
streaming pipeline" +
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
index 6c1a64344ec..61b5ed16bb1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
@@ -45,10 +45,12 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertIterableEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -149,9 +151,150 @@ public class ProcessingExceptionHandlerIntegrationTest {
}
}
+ @Test
+ public void
shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler()
{
+ final KeyValue<String, String> event = new KeyValue<>("ID123-1",
"ID123-A1");
+ final KeyValue<String, String> eventError = new
KeyValue<>("ID123-2-ERR", "ID123-A2");
+
+ final MockProcessorSupplier<String, String, Void, Void> processor =
new MockProcessorSupplier<>();
+ final StreamsBuilder builder = new StreamsBuilder();
+ final AtomicBoolean isExecuted = new AtomicBoolean(false);
+ builder
+ .stream("TOPIC_NAME", Consumed.with(Serdes.String(),
Serdes.String()))
+ .map(KeyValue::new)
+ .mapValues(value -> value)
+ .process(runtimeErrorProcessorSupplierMock())
+ .map((k, v) -> {
+ isExecuted.set(true);
+ return KeyValue.pair(k, v);
+ })
+ .process(processor);
+
+ final Properties properties = new Properties();
+
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
FailProcessingExceptionHandlerMockTest.class);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
+ isExecuted.set(false);
+ inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+ assertTrue(isExecuted.get());
+ isExecuted.set(false);
+ final StreamsException e = assertThrows(StreamsException.class, ()
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
+ assertTrue(e.getMessage().contains("Exception caught in process. "
+ + "taskId=0_0, processor=KSTREAM-SOURCE-0000000000,
topic=TOPIC_NAME, "
+ + "partition=0, offset=1,
stacktrace=java.lang.RuntimeException: "
+ + "Exception should be handled by processing exception
handler"));
+ assertFalse(isExecuted.get());
+ }
+ }
+
+ @Test
+ public void
shouldStopOnFailedProcessorWhenProcessingExceptionOccursInContinueProcessingExceptionHandler()
{
+ final KeyValue<String, String> event = new KeyValue<>("ID123-1",
"ID123-A1");
+ final KeyValue<String, String> eventFalse = new
KeyValue<>("ID123-2-ERR", "ID123-A2");
+
+ final MockProcessorSupplier<String, String, Void, Void> processor =
new MockProcessorSupplier<>();
+ final StreamsBuilder builder = new StreamsBuilder();
+ final AtomicBoolean isExecuted = new AtomicBoolean(false);
+ builder
+ .stream("TOPIC_NAME", Consumed.with(Serdes.String(),
Serdes.String()))
+ .map(KeyValue::new)
+ .mapValues(value -> value)
+ .process(runtimeErrorProcessorSupplierMock())
+ .map((k, v) -> {
+ isExecuted.set(true);
+ return KeyValue.pair(k, v);
+ })
+ .process(processor);
+
+ final Properties properties = new Properties();
+
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
ContinueProcessingExceptionHandlerMockTest.class);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
+ isExecuted.set(false);
+ inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+ assertTrue(isExecuted.get());
+ isExecuted.set(false);
+ inputTopic.pipeInput(eventFalse.key, eventFalse.value,
Instant.EPOCH);
+ assertFalse(isExecuted.get());
+ }
+ }
+
+ @Test
+ public void
shouldStopProcessingWhenFatalUserExceptionInFailProcessingExceptionHandler() {
+ final KeyValue<String, String> event = new KeyValue<>("ID123-1",
"ID123-A1");
+ final KeyValue<String, String> eventError = new
KeyValue<>("ID123-ERR-FATAL", "ID123-A2");
+
+ final MockProcessorSupplier<String, String, Void, Void> processor =
new MockProcessorSupplier<>();
+ final StreamsBuilder builder = new StreamsBuilder();
+ final AtomicBoolean isExecuted = new AtomicBoolean(false);
+ builder
+ .stream("TOPIC_NAME", Consumed.with(Serdes.String(),
Serdes.String()))
+ .map(KeyValue::new)
+ .mapValues(value -> value)
+ .process(runtimeErrorProcessorSupplierMock())
+ .map((k, v) -> {
+ isExecuted.set(true);
+ return KeyValue.pair(k, v);
+ })
+ .process(processor);
+
+ final Properties properties = new Properties();
+
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
FailProcessingExceptionHandlerMockTest.class);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
+ isExecuted.set(false);
+ inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+ assertTrue(isExecuted.get());
+ isExecuted.set(false);
+ final StreamsException e = assertThrows(StreamsException.class, ()
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
+ assertEquals("KABOOM!", e.getCause().getMessage());
+ assertFalse(isExecuted.get());
+ }
+ }
+
+ @Test
+ public void
shouldStopProcessingWhenFatalUserExceptionInContinueProcessingExceptionHandler()
{
+ final KeyValue<String, String> event = new KeyValue<>("ID123-1",
"ID123-A1");
+ final KeyValue<String, String> eventError = new
KeyValue<>("ID123-ERR-FATAL", "ID123-A2");
+
+ final MockProcessorSupplier<String, String, Void, Void> processor =
new MockProcessorSupplier<>();
+ final StreamsBuilder builder = new StreamsBuilder();
+ final AtomicBoolean isExecuted = new AtomicBoolean(false);
+ builder
+ .stream("TOPIC_NAME", Consumed.with(Serdes.String(),
Serdes.String()))
+ .map(KeyValue::new)
+ .mapValues(value -> value)
+ .process(runtimeErrorProcessorSupplierMock())
+ .map((k, v) -> {
+ isExecuted.set(true);
+ return KeyValue.pair(k, v);
+ })
+ .process(processor);
+
+ final Properties properties = new Properties();
+
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
ContinueProcessingExceptionHandlerMockTest.class);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
+ isExecuted.set(false);
+ inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+ assertTrue(isExecuted.get());
+ isExecuted.set(false);
+ final StreamsException e = assertThrows(StreamsException.class, ()
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
+ assertEquals("KABOOM!", e.getCause().getMessage());
+ assertFalse(isExecuted.get());
+ }
+ }
+
public static class ContinueProcessingExceptionHandlerMockTest implements
ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse
handle(final ErrorHandlerContext context, final Record<?, ?> record, final
Exception exception) {
+ if (((String) record.key()).contains("FATAL")) {
+ throw new RuntimeException("KABOOM!");
+ }
assertProcessingExceptionHandlerInputs(context, record, exception);
return
ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE;
}
@@ -165,6 +308,9 @@ public class ProcessingExceptionHandlerIntegrationTest {
public static class FailProcessingExceptionHandlerMockTest implements
ProcessingExceptionHandler {
@Override
public ProcessingExceptionHandler.ProcessingHandlerResponse
handle(final ErrorHandlerContext context, final Record<?, ?> record, final
Exception exception) {
+ if (((String) record.key()).contains("FATAL")) {
+ throw new RuntimeException("KABOOM!");
+ }
assertProcessingExceptionHandlerInputs(context, record, exception);
return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index df3f9276863..9fe9244e0aa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -60,6 +60,7 @@ import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -100,7 +101,7 @@ public class ProcessorNodeTest {
new ProcessorNode<>(NAME, new
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
- node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
internalProcessorContext));
+ node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
internalProcessorContext, false));
final FailedProcessingException failedProcessingException =
assertThrows(FailedProcessingException.class,
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
@@ -116,7 +117,7 @@ public class ProcessorNodeTest {
new ProcessorNode<>(NAME, new
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
- node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
internalProcessorContext));
+ node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
internalProcessorContext, false));
assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE,
TIMESTAMP)));
}
@@ -146,6 +147,21 @@ public class ProcessorNodeTest {
verify(processingExceptionHandler, never()).handle(any(), any(),
any());
}
+ @Test
+ public void
shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException()
{
+ final ProcessorNode<Object, Object, Object, Object> node =
+ new ProcessorNode<>(NAME, new
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
+
+ final InternalProcessorContext<Object, Object>
internalProcessorContext = mockInternalProcessorContext();
+ node.init(internalProcessorContext, new
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
internalProcessorContext, true));
+
+ final FailedProcessingException failedProcessingException =
assertThrows(FailedProcessingException.class,
+ () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
+
+ assertInstanceOf(RuntimeException.class,
failedProcessingException.getCause());
+ assertEquals("KABOOM!",
failedProcessingException.getCause().getMessage());
+ }
+
private static class ExceptionalProcessor implements Processor<Object,
Object, Object, Object> {
@Override
public void init(final ProcessorContext<Object, Object> context) {
@@ -323,10 +339,14 @@ public class ProcessorNodeTest {
private final ProcessingExceptionHandler.ProcessingHandlerResponse
response;
private final InternalProcessorContext<Object, Object>
internalProcessorContext;
+ private final boolean shouldThrowException;
+
public ProcessingExceptionHandlerMock(final
ProcessingExceptionHandler.ProcessingHandlerResponse response,
- final
InternalProcessorContext<Object, Object> internalProcessorContext) {
+ final
InternalProcessorContext<Object, Object> internalProcessorContext,
+ final boolean
shouldThrowException) {
this.response = response;
this.internalProcessorContext = internalProcessorContext;
+ this.shouldThrowException = shouldThrowException;
}
@Override
@@ -338,9 +358,12 @@ public class ProcessorNodeTest {
assertEquals(internalProcessorContext.taskId(), context.taskId());
assertEquals(KEY, record.key());
assertEquals(VALUE, record.value());
- assertTrue(exception instanceof RuntimeException);
+ assertInstanceOf(RuntimeException.class, exception);
assertEquals("Processing exception should be caught and handled by
the processing exception handler.", exception.getMessage());
+ if (shouldThrowException) {
+ throw new RuntimeException("KABOOM!");
+ }
return response;
}