This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0dc9b9e4eec KAFKA-16448: Handle fatal user exception during processing 
error (#16675)
0dc9b9e4eec is described below

commit 0dc9b9e4eec124df9f5c92b41db2b4fb7bd49600
Author: Sebastien Viale <[email protected]>
AuthorDate: Wed Jul 31 07:58:07 2024 +0200

    KAFKA-16448: Handle fatal user exception during processing error (#16675)
    
    This PR is part of KIP-1033 which aims to bring a 
ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions 
that occur during processing.
    
    This PR catch the exceptions thrown while handling a processing exception
    
    Co-authored-by: Dabz <[email protected]>
    Co-authored-by: loicgreffier <[email protected]>
    
    Reviewers: Bruno Cadonna <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../streams/processor/internals/ProcessorNode.java |   8 +-
 .../ProcessingExceptionHandlerIntegrationTest.java | 146 +++++++++++++++++++++
 .../processor/internals/ProcessorNodeTest.java     |  31 ++++-
 3 files changed, 179 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 08da872bb73..3ccfcf24905 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -212,9 +212,13 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
                 internalProcessorContext.currentNode().name(),
                 internalProcessorContext.taskId());
 
-            final ProcessingExceptionHandler.ProcessingHandlerResponse 
response = processingExceptionHandler
-                .handle(errorHandlerContext, record, e);
+            final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
 
+            try {
+                response = 
processingExceptionHandler.handle(errorHandlerContext, record, e);
+            } catch (final Exception fatalUserException) {
+                throw new FailedProcessingException(fatalUserException);
+            }
             if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
                 log.error("Processing exception handler is set to fail upon" +
                      " a processing error. If you would rather have the 
streaming pipeline" +
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
index 6c1a64344ec..61b5ed16bb1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
@@ -45,10 +45,12 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertIterableEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -149,9 +151,150 @@ public class ProcessingExceptionHandlerIntegrationTest {
         }
     }
 
+    @Test
+    public void 
shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler()
 {
+        final KeyValue<String, String> event = new KeyValue<>("ID123-1", 
"ID123-A1");
+        final KeyValue<String, String> eventError = new 
KeyValue<>("ID123-2-ERR", "ID123-A2");
+
+        final MockProcessorSupplier<String, String, Void, Void> processor = 
new MockProcessorSupplier<>();
+        final StreamsBuilder builder = new StreamsBuilder();
+        final AtomicBoolean isExecuted = new AtomicBoolean(false);
+        builder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .map(KeyValue::new)
+            .mapValues(value -> value)
+            .process(runtimeErrorProcessorSupplierMock())
+            .map((k, v) -> {
+                isExecuted.set(true);
+                return KeyValue.pair(k, v);
+            })
+            .process(processor);
+
+        final Properties properties = new Properties();
+        
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
FailProcessingExceptionHandlerMockTest.class);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+            final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
+            isExecuted.set(false);
+            inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+            assertTrue(isExecuted.get());
+            isExecuted.set(false);
+            final StreamsException e = assertThrows(StreamsException.class, () 
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
+            assertTrue(e.getMessage().contains("Exception caught in process. "
+                + "taskId=0_0, processor=KSTREAM-SOURCE-0000000000, 
topic=TOPIC_NAME, "
+                + "partition=0, offset=1, 
stacktrace=java.lang.RuntimeException: "
+                + "Exception should be handled by processing exception 
handler"));
+            assertFalse(isExecuted.get());
+        }
+    }
+
+    @Test
+    public void 
shouldStopOnFailedProcessorWhenProcessingExceptionOccursInContinueProcessingExceptionHandler()
 {
+        final KeyValue<String, String> event = new KeyValue<>("ID123-1", 
"ID123-A1");
+        final KeyValue<String, String> eventFalse = new 
KeyValue<>("ID123-2-ERR", "ID123-A2");
+
+        final MockProcessorSupplier<String, String, Void, Void> processor = 
new MockProcessorSupplier<>();
+        final StreamsBuilder builder = new StreamsBuilder();
+        final AtomicBoolean isExecuted = new AtomicBoolean(false);
+        builder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .map(KeyValue::new)
+            .mapValues(value -> value)
+            .process(runtimeErrorProcessorSupplierMock())
+            .map((k, v) -> {
+                isExecuted.set(true);
+                return KeyValue.pair(k, v);
+            })
+            .process(processor);
+
+        final Properties properties = new Properties();
+        
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
ContinueProcessingExceptionHandlerMockTest.class);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+            final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
+            isExecuted.set(false);
+            inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+            assertTrue(isExecuted.get());
+            isExecuted.set(false);
+            inputTopic.pipeInput(eventFalse.key, eventFalse.value, 
Instant.EPOCH);
+            assertFalse(isExecuted.get());
+        }
+    }
+
+    @Test
+    public void 
shouldStopProcessingWhenFatalUserExceptionInFailProcessingExceptionHandler() {
+        final KeyValue<String, String> event = new KeyValue<>("ID123-1", 
"ID123-A1");
+        final KeyValue<String, String> eventError = new 
KeyValue<>("ID123-ERR-FATAL", "ID123-A2");
+
+        final MockProcessorSupplier<String, String, Void, Void> processor = 
new MockProcessorSupplier<>();
+        final StreamsBuilder builder = new StreamsBuilder();
+        final AtomicBoolean isExecuted = new AtomicBoolean(false);
+        builder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .map(KeyValue::new)
+            .mapValues(value -> value)
+            .process(runtimeErrorProcessorSupplierMock())
+            .map((k, v) -> {
+                isExecuted.set(true);
+                return KeyValue.pair(k, v);
+            })
+            .process(processor);
+
+        final Properties properties = new Properties();
+        
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
FailProcessingExceptionHandlerMockTest.class);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+            final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
+            isExecuted.set(false);
+            inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+            assertTrue(isExecuted.get());
+            isExecuted.set(false);
+            final StreamsException e = assertThrows(StreamsException.class, () 
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
+            assertEquals("KABOOM!", e.getCause().getMessage());
+            assertFalse(isExecuted.get());
+        }
+    }
+
+    @Test
+    public void 
shouldStopProcessingWhenFatalUserExceptionInContinueProcessingExceptionHandler()
 {
+        final KeyValue<String, String> event = new KeyValue<>("ID123-1", 
"ID123-A1");
+        final KeyValue<String, String> eventError = new 
KeyValue<>("ID123-ERR-FATAL", "ID123-A2");
+
+        final MockProcessorSupplier<String, String, Void, Void> processor = 
new MockProcessorSupplier<>();
+        final StreamsBuilder builder = new StreamsBuilder();
+        final AtomicBoolean isExecuted = new AtomicBoolean(false);
+        builder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .map(KeyValue::new)
+            .mapValues(value -> value)
+            .process(runtimeErrorProcessorSupplierMock())
+            .map((k, v) -> {
+                isExecuted.set(true);
+                return KeyValue.pair(k, v);
+            })
+            .process(processor);
+
+        final Properties properties = new Properties();
+        
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
ContinueProcessingExceptionHandlerMockTest.class);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+            final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
+            isExecuted.set(false);
+            inputTopic.pipeInput(event.key, event.value, Instant.EPOCH);
+            assertTrue(isExecuted.get());
+            isExecuted.set(false);
+            final StreamsException e = assertThrows(StreamsException.class, () 
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
+            assertEquals("KABOOM!", e.getCause().getMessage());
+            assertFalse(isExecuted.get());
+        }
+    }
+
     public static class ContinueProcessingExceptionHandlerMockTest implements 
ProcessingExceptionHandler {
         @Override
         public ProcessingExceptionHandler.ProcessingHandlerResponse 
handle(final ErrorHandlerContext context, final Record<?, ?> record, final 
Exception exception) {
+            if (((String) record.key()).contains("FATAL")) {
+                throw new RuntimeException("KABOOM!");
+            }
             assertProcessingExceptionHandlerInputs(context, record, exception);
             return 
ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE;
         }
@@ -165,6 +308,9 @@ public class ProcessingExceptionHandlerIntegrationTest {
     public static class FailProcessingExceptionHandlerMockTest implements 
ProcessingExceptionHandler {
         @Override
         public ProcessingExceptionHandler.ProcessingHandlerResponse 
handle(final ErrorHandlerContext context, final Record<?, ?> record, final 
Exception exception) {
+            if (((String) record.key()).contains("FATAL")) {
+                throw new RuntimeException("KABOOM!");
+            }
             assertProcessingExceptionHandlerInputs(context, record, exception);
             return ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL;
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index df3f9276863..9fe9244e0aa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -60,6 +60,7 @@ import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetric
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -100,7 +101,7 @@ public class ProcessorNodeTest {
             new ProcessorNode<>(NAME, new 
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
 
         final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
-        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
 internalProcessorContext));
+        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL,
 internalProcessorContext, false));
 
         final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
             () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
@@ -116,7 +117,7 @@ public class ProcessorNodeTest {
             new ProcessorNode<>(NAME, new 
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
 
         final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
-        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
 internalProcessorContext));
+        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
 internalProcessorContext, false));
 
         assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, 
TIMESTAMP)));
     }
@@ -146,6 +147,21 @@ public class ProcessorNodeTest {
         verify(processingExceptionHandler, never()).handle(any(), any(), 
any());
     }
 
+    @Test
+    public void 
shouldThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException()
 {
+        final ProcessorNode<Object, Object, Object, Object> node =
+                new ProcessorNode<>(NAME, new 
IgnoredInternalExceptionsProcessor(), Collections.emptySet());
+
+        final InternalProcessorContext<Object, Object> 
internalProcessorContext = mockInternalProcessorContext();
+        node.init(internalProcessorContext, new 
ProcessingExceptionHandlerMock(ProcessingExceptionHandler.ProcessingHandlerResponse.CONTINUE,
 internalProcessorContext, true));
+
+        final FailedProcessingException failedProcessingException = 
assertThrows(FailedProcessingException.class,
+            () -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
+
+        assertInstanceOf(RuntimeException.class, 
failedProcessingException.getCause());
+        assertEquals("KABOOM!", 
failedProcessingException.getCause().getMessage());
+    }
+
     private static class ExceptionalProcessor implements Processor<Object, 
Object, Object, Object> {
         @Override
         public void init(final ProcessorContext<Object, Object> context) {
@@ -323,10 +339,14 @@ public class ProcessorNodeTest {
         private final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
         private final InternalProcessorContext<Object, Object> 
internalProcessorContext;
 
+        private final boolean shouldThrowException;
+
         public ProcessingExceptionHandlerMock(final 
ProcessingExceptionHandler.ProcessingHandlerResponse response,
-                                              final 
InternalProcessorContext<Object, Object> internalProcessorContext) {
+                                              final 
InternalProcessorContext<Object, Object> internalProcessorContext,
+                                              final boolean 
shouldThrowException) {
             this.response = response;
             this.internalProcessorContext = internalProcessorContext;
+            this.shouldThrowException = shouldThrowException;
         }
 
         @Override
@@ -338,9 +358,12 @@ public class ProcessorNodeTest {
             assertEquals(internalProcessorContext.taskId(), context.taskId());
             assertEquals(KEY, record.key());
             assertEquals(VALUE, record.value());
-            assertTrue(exception instanceof RuntimeException);
+            assertInstanceOf(RuntimeException.class, exception);
             assertEquals("Processing exception should be caught and handled by 
the processing exception handler.", exception.getMessage());
 
+            if (shouldThrowException) {
+                throw new RuntimeException("KABOOM!");
+            }
             return response;
         }
 

Reply via email to