This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 3.9 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 578fef23558830800b9e1ec561e6ec8448134d82 Author: Sebastien Viale <[email protected]> AuthorDate: Thu Aug 1 00:53:47 2024 +0200 KAFKA-16448: Handle processing exceptions in punctuate (#16300) This PR is part of KIP-1033 which aims to bring a ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions that occur during processing. This PR actually catches processing exceptions from punctuate. Co-authored-by: Dabz <[email protected]> Co-authored-by: loicgreffier <[email protected]> Reviewers: Bruno Cadonna <[email protected]>, Matthias J. Sax <[email protected]> --- checkstyle/suppressions.xml | 2 +- .../streams/processor/internals/StreamTask.java | 44 ++++++- .../processor/internals/StreamTaskTest.java | 137 ++++++++++++++++++++- 3 files changed, 176 insertions(+), 7 deletions(-) diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index ba28341d1a3..8c96ce71237 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -193,7 +193,7 @@ <!-- Streams --> <suppress checks="ClassFanOutComplexity" - files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore).java"/> + files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/> <suppress checks="MethodLength" files="KTableImpl.java"/> diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 8b253c6e16a..6f2edd442b0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -29,12 +29,14 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.TopologyConfig.TaskConfig; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.processor.Cancellable; import org.apache.kafka.streams.processor.PunctuationType; @@ -63,6 +65,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import static java.util.Collections.singleton; +import static org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency; import static org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeRecordSensor; @@ -101,6 +104,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, private final Sensor restoreRemainingSensor; private final Sensor punctuateLatencySensor; private final Sensor bufferedRecordsSensor; + private final Sensor droppedRecordsSensor; private final Map<String, Sensor> e2eLatencySensors = new HashMap<>(); private final RecordQueueCreator recordQueueCreator; @@ -160,6 +164,7 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, processLatencySensor = TaskMetrics.processLatencySensor(threadId, taskId, streamsMetrics); punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId, streamsMetrics); bufferedRecordsSensor = TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics); + droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId, taskId, streamsMetrics); for (final String terminalNodeName : topology.terminalNodes()) { e2eLatencySensors.put( @@ -915,15 +920,48 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator, try { maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time, punctuateLatencySensor); - } catch (final StreamsException e) { + } catch (final FailedProcessingException e) { + throw createStreamsException(node.name(), e.getCause()); + } catch (final TaskCorruptedException | TaskMigratedException e) { throw e; - } catch (final RuntimeException e) { - throw new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, node.name()), e); + } catch (final Exception e) { + final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext( + null, + recordContext.topic(), + recordContext.partition(), + recordContext.offset(), + recordContext.headers(), + node.name(), + id() + ); + + final ProcessingExceptionHandler.ProcessingHandlerResponse response; + + try { + response = processingExceptionHandler.handle(errorHandlerContext, null, e); + } catch (final Exception fatalUserException) { + throw new FailedProcessingException(fatalUserException); + } + + if (response == ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) { + log.error("Processing exception handler is set to fail upon" + + " a processing error. If you would rather have the streaming pipeline" + + " continue after a processing error, please set the " + + PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + " appropriately."); + + throw createStreamsException(node.name(), e); + } else { + droppedRecordsSensor.record(); + } } finally { processorContext.setCurrentNode(null); } } + private StreamsException createStreamsException(final String processorName, final Throwable cause) { + return new StreamsException(String.format("%sException caught while punctuating processor '%s'", logPrefix, processorName), cause); + } + @SuppressWarnings("unchecked") private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?> currNode, final long wallClockTime, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 817ffe1f74d..a8771c21539 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.InvalidOffsetException; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.consumer.OffsetResetStrategy; @@ -45,14 +46,19 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TopologyConfig; +import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler; import org.apache.kafka.streams.errors.LogAndFailExceptionHandler; +import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler; +import org.apache.kafka.streams.errors.ProcessingExceptionHandler; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TaskCorruptedException; import org.apache.kafka.streams.errors.TaskMigratedException; import org.apache.kafka.streams.errors.TopologyException; +import org.apache.kafka.streams.errors.internals.FailedProcessingException; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.StateStore; @@ -121,6 +127,7 @@ import static org.hamcrest.Matchers.not; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -240,13 +247,18 @@ public class StreamTaskTest { } private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue) { - return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName()); + return createConfig(eosConfig, enforcedProcessingValue, LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class.getName()); + } + + private static StreamsConfig createConfig(final String eosConfig, final String enforcedProcessingValue, final String deserializationExceptionHandler) { + return createConfig(eosConfig, enforcedProcessingValue, deserializationExceptionHandler, LogAndFailProcessingExceptionHandler.class.getName()); } private static StreamsConfig createConfig( final String eosConfig, final String enforcedProcessingValue, - final String deserializationExceptionHandler) { + final String deserializationExceptionHandler, + final String processingExceptionHandler) { final String canonicalPath; try { canonicalPath = BASE_DIR.getCanonicalPath(); @@ -262,7 +274,8 @@ public class StreamTaskTest { mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()), mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig), mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, enforcedProcessingValue), - mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler) + mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, deserializationExceptionHandler), + mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, processingExceptionHandler) ))); } @@ -2647,6 +2660,124 @@ public class StreamTaskTest { verify(recordCollector, never()).offsets(); } + @Test + public void shouldPunctuateNotHandleFailProcessingExceptionAndThrowStreamsException() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", + LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName())); + + final StreamsException streamsException = assertThrows(StreamsException.class, () -> + task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + throw new FailedProcessingException( + new RuntimeException("KABOOM!") + ); + }) + ); + + assertInstanceOf(RuntimeException.class, streamsException.getCause()); + assertEquals("KABOOM!", streamsException.getCause().getMessage()); + } + + @Test + public void shouldPunctuateNotHandleTaskCorruptedExceptionAndThrowItAsIs() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", + LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName())); + + final Set<TaskId> tasksIds = new HashSet<>(); + tasksIds.add(new TaskId(0, 0)); + final TaskCorruptedException expectedException = new TaskCorruptedException(tasksIds, new InvalidOffsetException("Invalid offset") { + @Override + public Set<TopicPartition> partitions() { + return new HashSet<>(Collections.singletonList(new TopicPartition("topic", 0))); + } + }); + + final TaskCorruptedException taskCorruptedException = assertThrows(TaskCorruptedException.class, () -> + task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + throw expectedException; + }) + ); + + assertEquals(expectedException, taskCorruptedException); + } + + @Test + public void shouldPunctuateNotHandleTaskMigratedExceptionAndThrowItAsIs() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", + LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName())); + + final TaskMigratedException expectedException = new TaskMigratedException("TaskMigratedException", new RuntimeException("Task migrated cause")); + + final TaskMigratedException taskCorruptedException = assertThrows(TaskMigratedException.class, () -> + task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + throw expectedException; + }) + ); + + assertEquals(expectedException, taskCorruptedException); + } + + @Test + public void shouldPunctuateNotThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithContinue() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", + LogAndFailExceptionHandler.class.getName(), LogAndContinueProcessingExceptionHandler.class.getName())); + + assertDoesNotThrow(() -> + task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + throw new KafkaException("KABOOM!"); + }) + ); + } + + @Test + public void shouldPunctuateThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithFail() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", + LogAndFailExceptionHandler.class.getName(), LogAndFailProcessingExceptionHandler.class.getName())); + + final StreamsException streamsException = assertThrows(StreamsException.class, + () -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + throw new KafkaException("KABOOM!"); + })); + + assertInstanceOf(KafkaException.class, streamsException.getCause()); + assertEquals("KABOOM!", streamsException.getCause().getMessage()); + } + + @Test + public void shouldPunctuateThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateManager.taskType()).thenReturn(TaskType.ACTIVE); + task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100", + LogAndFailExceptionHandler.class.getName(), ProcessingExceptionHandlerMock.class.getName())); + + final FailedProcessingException streamsException = assertThrows(FailedProcessingException.class, + () -> task.punctuate(processorStreamTime, 1, PunctuationType.STREAM_TIME, timestamp -> { + throw new KafkaException("KABOOM!"); + })); + + assertInstanceOf(RuntimeException.class, streamsException.getCause()); + assertEquals("KABOOM from ProcessingExceptionHandlerMock!", streamsException.getCause().getMessage()); + } + + public static class ProcessingExceptionHandlerMock implements ProcessingExceptionHandler { + @Override + public ProcessingExceptionHandler.ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + throw new RuntimeException("KABOOM from ProcessingExceptionHandlerMock!"); + } + @Override + public void configure(final Map<String, ?> configs) { + // No-op + } + } private ProcessorStateManager mockStateManager() { final ProcessorStateManager manager = mock(ProcessorStateManager.class);
