This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f1ef7a5a9f8 KAFKA-16448: Handle processing exceptions in punctuate 
(#16300)
f1ef7a5a9f8 is described below

commit f1ef7a5a9f87cda7ae1209fc2285259eba4204ca
Author: Sebastien Viale <[email protected]>
AuthorDate: Thu Aug 1 00:53:47 2024 +0200

    KAFKA-16448: Handle processing exceptions in punctuate (#16300)
    
    This PR is part of KIP-1033 which aims to bring a 
ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions 
that occur during processing.
    
    This PR actually catches processing exceptions from punctuate.
    
    Co-authored-by: Dabz <[email protected]>
    Co-authored-by: loicgreffier <[email protected]>
    
    Reviewers: Bruno Cadonna <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../streams/processor/internals/StreamTask.java    |  44 ++++++-
 .../processor/internals/StreamTaskTest.java        | 137 ++++++++++++++++++++-
 3 files changed, 176 insertions(+), 7 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ba28341d1a3..8c96ce71237 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -193,7 +193,7 @@
 
     <!-- Streams -->
     <suppress checks="ClassFanOutComplexity"
-              
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore).java"/>
+              
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
 
     <suppress checks="MethodLength"
               files="KTableImpl.java"/>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8b253c6e16a..6f2edd442b0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -29,12 +29,14 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.TopologyConfig.TaskConfig;
 import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
 import org.apache.kafka.streams.errors.internals.FailedProcessingException;
 import org.apache.kafka.streams.processor.Cancellable;
 import org.apache.kafka.streams.processor.PunctuationType;
@@ -63,6 +65,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.singleton;
+import static 
org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
 import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeRecordSensor;
 
@@ -101,6 +104,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     private final Sensor restoreRemainingSensor;
     private final Sensor punctuateLatencySensor;
     private final Sensor bufferedRecordsSensor;
+    private final Sensor droppedRecordsSensor;
     private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();
 
     private final RecordQueueCreator recordQueueCreator;
@@ -160,6 +164,7 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
         processLatencySensor = TaskMetrics.processLatencySensor(threadId, 
taskId, streamsMetrics);
         punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, 
streamsMetrics);
         bufferedRecordsSensor = 
TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
+        droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, 
taskId, streamsMetrics);
 
         for (final String terminalNodeName : topology.terminalNodes()) {
             e2eLatencySensors.put(
@@ -915,15 +920,48 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
 
         try {
             maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, 
punctuateLatencySensor);
-        } catch (final StreamsException e) {
+        } catch (final FailedProcessingException e) {
+            throw createStreamsException(node.name(), e.getCause());
+        } catch (final TaskCorruptedException | TaskMigratedException e) {
             throw e;
-        } catch (final RuntimeException e) {
-            throw new StreamsException(String.format("%sException caught while 
punctuating processor '%s'", logPrefix, node.name()), e);
+        } catch (final Exception e) {
+            final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
+                null,
+                recordContext.topic(),
+                recordContext.partition(),
+                recordContext.offset(),
+                recordContext.headers(),
+                node.name(),
+                id()
+            );
+
+            final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
+
+            try {
+                response = 
processingExceptionHandler.handle(errorHandlerContext, null, e);
+            } catch (final Exception fatalUserException) {
+                throw new FailedProcessingException(fatalUserException);
+            }
+
+            if (response == 
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
+                log.error("Processing exception handler is set to fail upon" +
+                        " a processing error. If you would rather have the 
streaming pipeline" +
+                        " continue after a processing error, please set the " +
+                        PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " 
appropriately.");
+
+                throw createStreamsException(node.name(), e);
+            } else {
+                droppedRecordsSensor.record();
+            }
         } finally {
             processorContext.setCurrentNode(null);
         }
     }
 
+    private StreamsException createStreamsException(final String 
processorName, final Throwable cause) {
+        return new StreamsException(String.format("%sException caught while 
punctuating processor '%s'", logPrefix, processorName), cause);
+    }
+
     @SuppressWarnings("unchecked")
     private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> 
currNode,
                                         final long wallClockTime,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 817ffe1f74d..a8771c21539 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
 import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -45,14 +46,19 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TopologyConfig;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import 
org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
+import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
 import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.errors.internals.FailedProcessingException;
 import org.apache.kafka.streams.processor.PunctuationType;
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.StateStore;
@@ -121,6 +127,7 @@ import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -240,13 +247,18 @@ public class StreamTaskTest {
     }
 
     private static StreamsConfig createConfig(final String eosConfig, final 
String enforcedProcessingValue) {
-        return createConfig(eosConfig, enforcedProcessingValue, 
LogAndFailExceptionHandler.class.getName());
+        return createConfig(eosConfig, enforcedProcessingValue, 
LogAndFailExceptionHandler.class.getName(), 
LogAndFailProcessingExceptionHandler.class.getName());
+    }
+
+    private static StreamsConfig createConfig(final String eosConfig, final 
String enforcedProcessingValue, final String deserializationExceptionHandler) {
+        return createConfig(eosConfig, enforcedProcessingValue, 
deserializationExceptionHandler, 
LogAndFailProcessingExceptionHandler.class.getName());
     }
 
     private static StreamsConfig createConfig(
         final String eosConfig,
         final String enforcedProcessingValue,
-        final String deserializationExceptionHandler) {
+        final String deserializationExceptionHandler,
+        final String processingExceptionHandler) {
         final String canonicalPath;
         try {
             canonicalPath = BASE_DIR.getCanonicalPath();
@@ -262,7 +274,8 @@ public class StreamTaskTest {
             mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, 
MockTimestampExtractor.class.getName()),
             mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig),
             mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 
enforcedProcessingValue),
-            
mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
deserializationExceptionHandler)
+            
mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, 
deserializationExceptionHandler),
+            mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
processingExceptionHandler)
         )));
     }
 
@@ -2647,6 +2660,124 @@ public class StreamTaskTest {
         verify(recordCollector, never()).offsets();
     }
 
+    @Test
+    public void 
shouldPunctuateNotHandleFailProcessingExceptionAndThrowStreamsException() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+            LogAndFailExceptionHandler.class.getName(), 
LogAndContinueProcessingExceptionHandler.class.getName()));
+
+        final StreamsException streamsException = 
assertThrows(StreamsException.class, () ->
+            task.punctuate(processorStreamTime, 1, 
PunctuationType.STREAM_TIME, timestamp -> {
+                throw new FailedProcessingException(
+                    new RuntimeException("KABOOM!")
+                );
+            })
+        );
+
+        assertInstanceOf(RuntimeException.class, streamsException.getCause());
+        assertEquals("KABOOM!", streamsException.getCause().getMessage());
+    }
+
+    @Test
+    public void shouldPunctuateNotHandleTaskCorruptedExceptionAndThrowItAsIs() 
{
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+            LogAndFailExceptionHandler.class.getName(), 
LogAndContinueProcessingExceptionHandler.class.getName()));
+
+        final Set<TaskId> tasksIds = new HashSet<>();
+        tasksIds.add(new TaskId(0, 0));
+        final TaskCorruptedException expectedException = new 
TaskCorruptedException(tasksIds, new InvalidOffsetException("Invalid offset") {
+            @Override
+            public Set<TopicPartition> partitions() {
+                return new HashSet<>(Collections.singletonList(new 
TopicPartition("topic", 0)));
+            }
+        });
+
+        final TaskCorruptedException taskCorruptedException = 
assertThrows(TaskCorruptedException.class, () ->
+            task.punctuate(processorStreamTime, 1, 
PunctuationType.STREAM_TIME, timestamp -> {
+                throw expectedException;
+            })
+        );
+
+        assertEquals(expectedException, taskCorruptedException);
+    }
+
+    @Test
+    public void shouldPunctuateNotHandleTaskMigratedExceptionAndThrowItAsIs() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+            LogAndFailExceptionHandler.class.getName(), 
LogAndContinueProcessingExceptionHandler.class.getName()));
+
+        final TaskMigratedException expectedException = new 
TaskMigratedException("TaskMigratedException", new RuntimeException("Task 
migrated cause"));
+
+        final TaskMigratedException taskCorruptedException = 
assertThrows(TaskMigratedException.class, () ->
+            task.punctuate(processorStreamTime, 1, 
PunctuationType.STREAM_TIME, timestamp -> {
+                throw expectedException;
+            })
+        );
+
+        assertEquals(expectedException, taskCorruptedException);
+    }
+
+    @Test
+    public void 
shouldPunctuateNotThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithContinue()
 {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+            LogAndFailExceptionHandler.class.getName(), 
LogAndContinueProcessingExceptionHandler.class.getName()));
+
+        assertDoesNotThrow(() ->
+            task.punctuate(processorStreamTime, 1, 
PunctuationType.STREAM_TIME, timestamp -> {
+                throw new KafkaException("KABOOM!");
+            })
+        );
+    }
+
+    @Test
+    public void 
shouldPunctuateThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithFail()
 {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+            LogAndFailExceptionHandler.class.getName(), 
LogAndFailProcessingExceptionHandler.class.getName()));
+
+        final StreamsException streamsException = 
assertThrows(StreamsException.class,
+            () -> task.punctuate(processorStreamTime, 1, 
PunctuationType.STREAM_TIME, timestamp -> {
+                throw new KafkaException("KABOOM!");
+            }));
+
+        assertInstanceOf(KafkaException.class, streamsException.getCause());
+        assertEquals("KABOOM!", streamsException.getCause().getMessage());
+    }
+
+    @Test
+    public void 
shouldPunctuateThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException()
 {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+        task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+                LogAndFailExceptionHandler.class.getName(), 
ProcessingExceptionHandlerMock.class.getName()));
+
+        final FailedProcessingException streamsException = 
assertThrows(FailedProcessingException.class,
+            () -> task.punctuate(processorStreamTime, 1, 
PunctuationType.STREAM_TIME, timestamp -> {
+                throw new KafkaException("KABOOM!");
+            }));
+
+        assertInstanceOf(RuntimeException.class, streamsException.getCause());
+        assertEquals("KABOOM from ProcessingExceptionHandlerMock!", 
streamsException.getCause().getMessage());
+    }
+
+    public static class ProcessingExceptionHandlerMock implements 
ProcessingExceptionHandler {
+        @Override
+        public ProcessingExceptionHandler.ProcessingHandlerResponse 
handle(final ErrorHandlerContext context, final Record<?, ?> record, final 
Exception exception) {
+            throw new RuntimeException("KABOOM from 
ProcessingExceptionHandlerMock!");
+        }
+        @Override
+        public void configure(final Map<String, ?> configs) {
+            // No-op
+        }
+    }
 
     private ProcessorStateManager mockStateManager() {
         final ProcessorStateManager manager = 
mock(ProcessorStateManager.class);

Reply via email to