This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f1ef7a5a9f8 KAFKA-16448: Handle processing exceptions in punctuate
(#16300)
f1ef7a5a9f8 is described below
commit f1ef7a5a9f87cda7ae1209fc2285259eba4204ca
Author: Sebastien Viale <[email protected]>
AuthorDate: Thu Aug 1 00:53:47 2024 +0200
KAFKA-16448: Handle processing exceptions in punctuate (#16300)
This PR is part of KIP-1033 which aims to bring a
ProcessingExceptionHandler to Kafka Streams in order to deal with exceptions
that occur during processing.
This PR actually catches processing exceptions from punctuate.
Co-authored-by: Dabz <[email protected]>
Co-authored-by: loicgreffier <[email protected]>
Reviewers: Bruno Cadonna <[email protected]>, Matthias J. Sax
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../streams/processor/internals/StreamTask.java | 44 ++++++-
.../processor/internals/StreamTaskTest.java | 137 ++++++++++++++++++++-
3 files changed, 176 insertions(+), 7 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ba28341d1a3..8c96ce71237 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -193,7 +193,7 @@
<!-- Streams -->
<suppress checks="ClassFanOutComplexity"
-
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore).java"/>
+
files="(KafkaStreams|KStreamImpl|KTableImpl|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|IQv2StoreIntegrationTest|KStreamImplTest|RocksDBStore|StreamTask).java"/>
<suppress checks="MethodLength"
files="KTableImpl.java"/>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8b253c6e16a..6f2edd442b0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -29,12 +29,14 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.TopologyConfig.TaskConfig;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.Cancellable;
import org.apache.kafka.streams.processor.PunctuationType;
@@ -63,6 +65,7 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import static java.util.Collections.singleton;
+import static
org.apache.kafka.streams.StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeRecordSensor;
@@ -101,6 +104,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
private final Sensor restoreRemainingSensor;
private final Sensor punctuateLatencySensor;
private final Sensor bufferedRecordsSensor;
+ private final Sensor droppedRecordsSensor;
private final Map<String, Sensor> e2eLatencySensors = new HashMap<>();
private final RecordQueueCreator recordQueueCreator;
@@ -160,6 +164,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
processLatencySensor = TaskMetrics.processLatencySensor(threadId,
taskId, streamsMetrics);
punctuateLatencySensor = TaskMetrics.punctuateSensor(threadId, taskId,
streamsMetrics);
bufferedRecordsSensor =
TaskMetrics.activeBufferedRecordsSensor(threadId, taskId, streamsMetrics);
+ droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(threadId,
taskId, streamsMetrics);
for (final String terminalNodeName : topology.terminalNodes()) {
e2eLatencySensors.put(
@@ -915,15 +920,48 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
try {
maybeMeasureLatency(() -> punctuator.punctuate(timestamp), time,
punctuateLatencySensor);
- } catch (final StreamsException e) {
+ } catch (final FailedProcessingException e) {
+ throw createStreamsException(node.name(), e.getCause());
+ } catch (final TaskCorruptedException | TaskMigratedException e) {
throw e;
- } catch (final RuntimeException e) {
- throw new StreamsException(String.format("%sException caught while
punctuating processor '%s'", logPrefix, node.name()), e);
+ } catch (final Exception e) {
+ final ErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
+ null,
+ recordContext.topic(),
+ recordContext.partition(),
+ recordContext.offset(),
+ recordContext.headers(),
+ node.name(),
+ id()
+ );
+
+ final ProcessingExceptionHandler.ProcessingHandlerResponse
response;
+
+ try {
+ response =
processingExceptionHandler.handle(errorHandlerContext, null, e);
+ } catch (final Exception fatalUserException) {
+ throw new FailedProcessingException(fatalUserException);
+ }
+
+ if (response ==
ProcessingExceptionHandler.ProcessingHandlerResponse.FAIL) {
+ log.error("Processing exception handler is set to fail upon" +
+ " a processing error. If you would rather have the
streaming pipeline" +
+ " continue after a processing error, please set the " +
+ PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG + "
appropriately.");
+
+ throw createStreamsException(node.name(), e);
+ } else {
+ droppedRecordsSensor.record();
+ }
} finally {
processorContext.setCurrentNode(null);
}
}
+ private StreamsException createStreamsException(final String
processorName, final Throwable cause) {
+ return new StreamsException(String.format("%sException caught while
punctuating processor '%s'", logPrefix, processorName), cause);
+ }
+
@SuppressWarnings("unchecked")
private void updateProcessorContext(final ProcessorNode<?, ?, ?, ?>
currNode,
final long wallClockTime,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 817ffe1f74d..a8771c21539 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
@@ -45,14 +46,19 @@ import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyConfig;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import
org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
+import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.errors.TopologyException;
+import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
@@ -121,6 +127,7 @@ import static org.hamcrest.Matchers.not;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -240,13 +247,18 @@ public class StreamTaskTest {
}
private static StreamsConfig createConfig(final String eosConfig, final
String enforcedProcessingValue) {
- return createConfig(eosConfig, enforcedProcessingValue,
LogAndFailExceptionHandler.class.getName());
+ return createConfig(eosConfig, enforcedProcessingValue,
LogAndFailExceptionHandler.class.getName(),
LogAndFailProcessingExceptionHandler.class.getName());
+ }
+
+ private static StreamsConfig createConfig(final String eosConfig, final
String enforcedProcessingValue, final String deserializationExceptionHandler) {
+ return createConfig(eosConfig, enforcedProcessingValue,
deserializationExceptionHandler,
LogAndFailProcessingExceptionHandler.class.getName());
}
private static StreamsConfig createConfig(
final String eosConfig,
final String enforcedProcessingValue,
- final String deserializationExceptionHandler) {
+ final String deserializationExceptionHandler,
+ final String processingExceptionHandler) {
final String canonicalPath;
try {
canonicalPath = BASE_DIR.getCanonicalPath();
@@ -262,7 +274,8 @@ public class StreamTaskTest {
mkEntry(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
MockTimestampExtractor.class.getName()),
mkEntry(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, eosConfig),
mkEntry(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG,
enforcedProcessingValue),
-
mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
deserializationExceptionHandler)
+
mkEntry(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
deserializationExceptionHandler),
+ mkEntry(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
processingExceptionHandler)
)));
}
@@ -2647,6 +2660,124 @@ public class StreamTaskTest {
verify(recordCollector, never()).offsets();
}
+ @Test
+ public void
shouldPunctuateNotHandleFailProcessingExceptionAndThrowStreamsException() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(),
LogAndContinueProcessingExceptionHandler.class.getName()));
+
+ final StreamsException streamsException =
assertThrows(StreamsException.class, () ->
+ task.punctuate(processorStreamTime, 1,
PunctuationType.STREAM_TIME, timestamp -> {
+ throw new FailedProcessingException(
+ new RuntimeException("KABOOM!")
+ );
+ })
+ );
+
+ assertInstanceOf(RuntimeException.class, streamsException.getCause());
+ assertEquals("KABOOM!", streamsException.getCause().getMessage());
+ }
+
+ @Test
+ public void shouldPunctuateNotHandleTaskCorruptedExceptionAndThrowItAsIs()
{
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(),
LogAndContinueProcessingExceptionHandler.class.getName()));
+
+ final Set<TaskId> tasksIds = new HashSet<>();
+ tasksIds.add(new TaskId(0, 0));
+ final TaskCorruptedException expectedException = new
TaskCorruptedException(tasksIds, new InvalidOffsetException("Invalid offset") {
+ @Override
+ public Set<TopicPartition> partitions() {
+ return new HashSet<>(Collections.singletonList(new
TopicPartition("topic", 0)));
+ }
+ });
+
+ final TaskCorruptedException taskCorruptedException =
assertThrows(TaskCorruptedException.class, () ->
+ task.punctuate(processorStreamTime, 1,
PunctuationType.STREAM_TIME, timestamp -> {
+ throw expectedException;
+ })
+ );
+
+ assertEquals(expectedException, taskCorruptedException);
+ }
+
+ @Test
+ public void shouldPunctuateNotHandleTaskMigratedExceptionAndThrowItAsIs() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(),
LogAndContinueProcessingExceptionHandler.class.getName()));
+
+ final TaskMigratedException expectedException = new
TaskMigratedException("TaskMigratedException", new RuntimeException("Task
migrated cause"));
+
+ final TaskMigratedException taskCorruptedException =
assertThrows(TaskMigratedException.class, () ->
+ task.punctuate(processorStreamTime, 1,
PunctuationType.STREAM_TIME, timestamp -> {
+ throw expectedException;
+ })
+ );
+
+ assertEquals(expectedException, taskCorruptedException);
+ }
+
+ @Test
+ public void
shouldPunctuateNotThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithContinue()
{
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(),
LogAndContinueProcessingExceptionHandler.class.getName()));
+
+ assertDoesNotThrow(() ->
+ task.punctuate(processorStreamTime, 1,
PunctuationType.STREAM_TIME, timestamp -> {
+ throw new KafkaException("KABOOM!");
+ })
+ );
+ }
+
+ @Test
+ public void
shouldPunctuateThrowStreamsExceptionWhenProcessingExceptionHandlerRepliesWithFail()
{
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(),
LogAndFailProcessingExceptionHandler.class.getName()));
+
+ final StreamsException streamsException =
assertThrows(StreamsException.class,
+ () -> task.punctuate(processorStreamTime, 1,
PunctuationType.STREAM_TIME, timestamp -> {
+ throw new KafkaException("KABOOM!");
+ }));
+
+ assertInstanceOf(KafkaException.class, streamsException.getCause());
+ assertEquals("KABOOM!", streamsException.getCause().getMessage());
+ }
+
+ @Test
+ public void
shouldPunctuateThrowFailedProcessingExceptionWhenProcessingExceptionHandlerThrowsAnException()
{
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatelessTask(createConfig(AT_LEAST_ONCE, "100",
+ LogAndFailExceptionHandler.class.getName(),
ProcessingExceptionHandlerMock.class.getName()));
+
+ final FailedProcessingException streamsException =
assertThrows(FailedProcessingException.class,
+ () -> task.punctuate(processorStreamTime, 1,
PunctuationType.STREAM_TIME, timestamp -> {
+ throw new KafkaException("KABOOM!");
+ }));
+
+ assertInstanceOf(RuntimeException.class, streamsException.getCause());
+ assertEquals("KABOOM from ProcessingExceptionHandlerMock!",
streamsException.getCause().getMessage());
+ }
+
+ public static class ProcessingExceptionHandlerMock implements
ProcessingExceptionHandler {
+ @Override
+ public ProcessingExceptionHandler.ProcessingHandlerResponse
handle(final ErrorHandlerContext context, final Record<?, ?> record, final
Exception exception) {
+ throw new RuntimeException("KABOOM from
ProcessingExceptionHandlerMock!");
+ }
+ @Override
+ public void configure(final Map<String, ?> configs) {
+ // No-op
+ }
+ }
private ProcessorStateManager mockStateManager() {
final ProcessorStateManager manager =
mock(ProcessorStateManager.class);