This is an automated email from the ASF dual-hosted git repository.
cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 615c8c0e116 KAFKA-17850: fix leaking internal exception in state
manager (#17711)
615c8c0e116 is described below
commit 615c8c0e11678c39904d8bdde991a0c1a4b67625
Author: Sebastien Viale <[email protected]>
AuthorDate: Tue Nov 19 10:51:07 2024 +0100
KAFKA-17850: fix leaking internal exception in state manager (#17711)
Following the KIP-1033 a FailedProcessingException is passed to the
Streams-specific uncaught exception handler.
The goal of the PR is to unwrap a FailedProcessingException into a
StreamsException when an exception occurs during the flushing or closing of a
store
Reviewer: Bruno Cadonna <[email protected]>
---
.../ProcessingExceptionHandlerIntegrationTest.java | 94 +++++++++++++++++++++-
.../processor/internals/ProcessorStateManager.java | 34 ++++++--
.../internals/ProcessorStateManagerTest.java | 61 ++++++++++++++
.../apache/kafka/test/MockCachedKeyValueStore.java | 42 ++++++++++
4 files changed, 224 insertions(+), 7 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
index 9b848ab9005..13e291e887c 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.ErrorHandlerContext;
+import
org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.kstream.Consumed;
@@ -108,6 +110,52 @@ public class ProcessingExceptionHandlerIntegrationTest {
}
}
+ @Test
+ public void
shouldFailWhenProcessingExceptionOccursFromFlushingCacheIfExceptionHandlerReturnsFail()
{
+ final List<KeyValue<String, String>> events = Arrays.asList(
+ new KeyValue<>("ID123-1", "ID123-A1"),
+ new KeyValue<>("ID123-1", "ID123-A2"),
+ new KeyValue<>("ID123-1", "ID123-A3"),
+ new KeyValue<>("ID123-1", "ID123-A4")
+ );
+
+ final List<KeyValueTimestamp<String, String>> expectedProcessedRecords
= Arrays.asList(
+ new KeyValueTimestamp<>("ID123-1", "1", TIMESTAMP.toEpochMilli()),
+ new KeyValueTimestamp<>("ID123-1", "2", TIMESTAMP.toEpochMilli())
+ );
+
+ final MockProcessorSupplier<String, String, Void, Void> processor =
new MockProcessorSupplier<>();
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder
+ .stream("TOPIC_NAME", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .count()
+ .toStream()
+ .mapValues(value -> value.toString())
+ .process(runtimeErrorProcessorSupplierMock())
+ .process(processor);
+
+ final Properties properties = new Properties();
+
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndFailProcessingExceptionHandler.class);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
+
+ final StreamsException exception =
assertThrows(StreamsException.class,
+ () -> inputTopic.pipeKeyValueList(events, TIMESTAMP,
Duration.ZERO));
+
+ assertTrue(exception.getMessage().contains("Failed to flush cache
of store KSTREAM-AGGREGATE-STATE-STORE-0000000001"));
+ assertEquals(expectedProcessedRecords.size(),
processor.theCapturedProcessor().processed().size());
+ assertIterableEquals(expectedProcessedRecords,
processor.theCapturedProcessor().processed());
+
+ final MetricName dropTotal = droppedRecordsTotalMetric();
+ final MetricName dropRate = droppedRecordsRateMetric();
+
+ assertEquals(0.0, driver.metrics().get(dropTotal).metricValue());
+ assertEquals(0.0, driver.metrics().get(dropRate).metricValue());
+ }
+ }
+
@Test
public void
shouldContinueWhenProcessingExceptionOccursIfExceptionHandlerReturnsContinue() {
final List<KeyValue<String, String>> events = Arrays.asList(
@@ -153,6 +201,50 @@ public class ProcessingExceptionHandlerIntegrationTest {
}
}
+ @Test
+ public void
shouldContinueWhenProcessingExceptionOccursFromFlushingCacheIfExceptionHandlerReturnsContinue()
{
+ final List<KeyValue<String, String>> events = Arrays.asList(
+ new KeyValue<>("ID123-1", "ID123-A1"),
+ new KeyValue<>("ID123-1", "ID123-A2"),
+ new KeyValue<>("ID123-1", "ID123-A3"),
+ new KeyValue<>("ID123-1", "ID123-A4")
+ );
+
+ final List<KeyValueTimestamp<String, String>> expectedProcessedRecords
= Arrays.asList(
+ new KeyValueTimestamp<>("ID123-1", "1", TIMESTAMP.toEpochMilli()),
+ new KeyValueTimestamp<>("ID123-1", "2", TIMESTAMP.toEpochMilli()),
+ new KeyValueTimestamp<>("ID123-1", "4", TIMESTAMP.toEpochMilli())
+ );
+
+ final MockProcessorSupplier<String, String, Void, Void> processor =
new MockProcessorSupplier<>();
+ final StreamsBuilder builder = new StreamsBuilder();
+ builder
+ .stream("TOPIC_NAME", Consumed.with(Serdes.String(),
Serdes.String()))
+ .groupByKey()
+ .count()
+ .toStream()
+ .mapValues(value -> value.toString())
+ .process(runtimeErrorProcessorSupplierMock())
+ .process(processor);
+
+ final Properties properties = new Properties();
+
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
LogAndContinueProcessingExceptionHandler.class);
+
+ try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+ final TestInputTopic<String, String> inputTopic =
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new
StringSerializer());
+ inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO);
+
+ assertEquals(expectedProcessedRecords.size(),
processor.theCapturedProcessor().processed().size());
+ assertIterableEquals(expectedProcessedRecords,
processor.theCapturedProcessor().processed());
+
+ final MetricName dropTotal = droppedRecordsTotalMetric();
+ final MetricName dropRate = droppedRecordsRateMetric();
+
+ assertEquals(1.0, driver.metrics().get(dropTotal).metricValue());
+ assertTrue((Double) driver.metrics().get(dropRate).metricValue() >
0.0);
+ }
+ }
+
@Test
public void
shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler()
{
final KeyValue<String, String> event = new KeyValue<>("ID123-1",
"ID123-A1");
@@ -377,7 +469,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
return () -> new ContextualProcessor<String, String, String, String>()
{
@Override
public void process(final Record<String, String> record) {
- if (record.key().contains("ERR")) {
+ if (record.key().contains("ERR") ||
record.value().equals("3")) {
throw new RuntimeException("Exception should be handled by
processing exception handler");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 30334abc53e..99092dbb4fa 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -24,6 +24,7 @@ import
org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -538,13 +539,20 @@ public class ProcessorStateManager implements
StateManager {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by
Streams itself
- if (exception instanceof StreamsException)
+ // In case of FailedProcessingException Do not keep
the failed processing exception in the stack trace
+ if (exception instanceof FailedProcessingException)
+ firstException = new ProcessorStateException(
+ format("%sFailed to flush state store %s",
logPrefix, store.name()),
+ exception.getCause());
+ else if (exception instanceof StreamsException)
firstException = exception;
else
firstException = new ProcessorStateException(
format("%sFailed to flush state store %s",
logPrefix, store.name()), exception);
+ log.error("Failed to flush state store {}: ",
store.name(), firstException);
+ } else {
+ log.error("Failed to flush state store {}: ",
store.name(), exception);
}
- log.error("Failed to flush state store {}: ",
store.name(), exception);
}
}
}
@@ -573,7 +581,12 @@ public class ProcessorStateManager implements StateManager
{
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by
Streams itself
- if (exception instanceof StreamsException) {
+ // In case of FailedProcessingException Do not keep
the failed processing exception in the stack trace
+ if (exception instanceof FailedProcessingException) {
+ firstException = new ProcessorStateException(
+ format("%sFailed to flush cache of store %s",
logPrefix, store.name()),
+ exception.getCause());
+ } else if (exception instanceof StreamsException) {
firstException = exception;
} else {
firstException = new ProcessorStateException(
@@ -581,8 +594,10 @@ public class ProcessorStateManager implements StateManager
{
exception
);
}
+ log.error("Failed to flush cache of store {}: ",
store.name(), firstException);
+ } else {
+ log.error("Failed to flush cache of store {}: ",
store.name(), exception);
}
- log.error("Failed to flush cache of store {}: ",
store.name(), exception);
}
}
}
@@ -618,13 +633,20 @@ public class ProcessorStateManager implements
StateManager {
} catch (final RuntimeException exception) {
if (firstException == null) {
// do NOT wrap the error if it is actually caused by
Streams itself
- if (exception instanceof StreamsException)
+ // In case of FailedProcessingException Do not keep
the failed processing exception in the stack trace
+ if (exception instanceof FailedProcessingException)
+ firstException = new ProcessorStateException(
+ format("%sFailed to close state store %s",
logPrefix, store.name()),
+ exception.getCause());
+ else if (exception instanceof StreamsException)
firstException = exception;
else
firstException = new ProcessorStateException(
format("%sFailed to close state store %s",
logPrefix, store.name()), exception);
+ log.error("Failed to close state store {}: ",
store.name(), firstException);
+ } else {
+ log.error("Failed to close state store {}: ",
store.name(), exception);
}
- log.error("Failed to close state store {}: ",
store.name(), exception);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index a46c54ee05b..da6c418bd1e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.errors.internals.FailedProcessingException;
import org.apache.kafka.streams.processor.CommitCallback;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateStore;
@@ -38,6 +39,7 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.internals.CachedStateStore;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.streams.state.internals.StoreQueryUtils;
+import org.apache.kafka.test.MockCachedKeyValueStore;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.TestUtils;
@@ -57,6 +59,7 @@ import java.io.FileWriter;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -771,6 +774,64 @@ public class ProcessorStateManagerTest {
assertEquals(exception, thrown);
}
+ @Test
+ public void
shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAFailedProcessingException()
{
+ final RuntimeException exception = new RuntimeException("KABOOM!");
+ final ProcessorStateManager stateManager =
getStateManager(Task.TaskType.ACTIVE);
+ final MockKeyValueStore stateStore = new
MockKeyValueStore(persistentStoreName, true) {
+ @Override
+ public void flush() {
+ throw new FailedProcessingException("processor", exception);
+ }
+ };
+ stateManager.registerStore(stateStore,
stateStore.stateRestoreCallback, null);
+
+ final ProcessorStateException thrown =
assertThrows(ProcessorStateException.class, stateManager::flush);
+ assertEquals(exception, thrown.getCause());
+
assertFalse(exception.getMessage().contains("FailedProcessingException"));
+ assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch(
+ element ->
element.getClassName().contains(FailedProcessingException.class.getSimpleName())));
+ }
+
+ @Test
+ public void
shouldThrowProcessorStateExceptionOnFlushCacheIfStoreThrowsAFailedProcessingException()
{
+ final RuntimeException exception = new RuntimeException("KABOOM!");
+ final ProcessorStateManager stateManager =
getStateManager(Task.TaskType.ACTIVE);
+ final MockCachedKeyValueStore stateStore = new
MockCachedKeyValueStore(persistentStoreName, true) {
+ @Override
+ public void flushCache() {
+ throw new FailedProcessingException("processor", exception);
+ }
+ };
+ stateManager.registerStore(stateStore,
stateStore.stateRestoreCallback, null);
+
+ final ProcessorStateException thrown =
assertThrows(ProcessorStateException.class, stateManager::flushCache);
+ assertEquals(exception, thrown.getCause());
+
assertFalse(exception.getMessage().contains("FailedProcessingException"));
+ assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch(
+ element ->
element.getClassName().contains(FailedProcessingException.class.getSimpleName())));
+
+ }
+
+ @Test
+ public void
shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAFailedProcessingException()
{
+ final RuntimeException exception = new RuntimeException("KABOOM!");
+ final ProcessorStateManager stateManager =
getStateManager(Task.TaskType.ACTIVE);
+ final MockKeyValueStore stateStore = new
MockKeyValueStore(persistentStoreName, true) {
+ @Override
+ public void close() {
+ throw new FailedProcessingException("processor", exception);
+ }
+ };
+ stateManager.registerStore(stateStore,
stateStore.stateRestoreCallback, null);
+
+ final ProcessorStateException thrown =
assertThrows(ProcessorStateException.class, stateManager::close);
+ assertEquals(exception, thrown.getCause());
+
assertFalse(exception.getMessage().contains("FailedProcessingException"));
+ assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch(
+ element ->
element.getClassName().contains(FailedProcessingException.class.getSimpleName())));
+ }
+
@Test
public void shouldThrowIfRestoringUnregisteredStore() {
final ProcessorStateManager stateManager =
getStateManager(Task.TaskType.ACTIVE);
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockCachedKeyValueStore.java
b/streams/src/test/java/org/apache/kafka/test/MockCachedKeyValueStore.java
new file mode 100644
index 00000000000..ee55757f7bf
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockCachedKeyValueStore.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
+
+public class MockCachedKeyValueStore extends MockKeyValueStore implements
CachedStateStore<Object, Object> {
+
+ public MockCachedKeyValueStore(String name, boolean persistent) {
+ super(name, persistent);
+ }
+
+ @Override
+ public boolean setFlushListener(CacheFlushListener<Object, Object>
listener, boolean sendOldValues) {
+ return false;
+ }
+
+ @Override
+ public void flushCache() {
+
+ }
+
+ @Override
+ public void clearCache() {
+
+ }
+}