This is an automated email from the ASF dual-hosted git repository.

cadonna pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 615c8c0e116 KAFKA-17850: fix leaking internal exception in state 
manager (#17711)
615c8c0e116 is described below

commit 615c8c0e11678c39904d8bdde991a0c1a4b67625
Author: Sebastien Viale <[email protected]>
AuthorDate: Tue Nov 19 10:51:07 2024 +0100

    KAFKA-17850: fix leaking internal exception in state manager (#17711)
    
    Following the KIP-1033 a FailedProcessingException is passed to the 
Streams-specific uncaught exception handler.
    
    The goal of the PR is to unwrap a FailedProcessingException into a 
StreamsException when an exception occurs during the flushing or closing of a 
store
    
    Reviewer: Bruno Cadonna <[email protected]>
---
 .../ProcessingExceptionHandlerIntegrationTest.java | 94 +++++++++++++++++++++-
 .../processor/internals/ProcessorStateManager.java | 34 ++++++--
 .../internals/ProcessorStateManagerTest.java       | 61 ++++++++++++++
 .../apache/kafka/test/MockCachedKeyValueStore.java | 42 ++++++++++
 4 files changed, 224 insertions(+), 7 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
index 9b848ab9005..13e291e887c 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java
@@ -26,6 +26,8 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.ErrorHandlerContext;
+import 
org.apache.kafka.streams.errors.LogAndContinueProcessingExceptionHandler;
+import org.apache.kafka.streams.errors.LogAndFailProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.ProcessingExceptionHandler;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.Consumed;
@@ -108,6 +110,52 @@ public class ProcessingExceptionHandlerIntegrationTest {
         }
     }
 
+    @Test
+    public void 
shouldFailWhenProcessingExceptionOccursFromFlushingCacheIfExceptionHandlerReturnsFail()
 {
+        final List<KeyValue<String, String>> events = Arrays.asList(
+            new KeyValue<>("ID123-1", "ID123-A1"),
+            new KeyValue<>("ID123-1", "ID123-A2"),
+            new KeyValue<>("ID123-1", "ID123-A3"),
+            new KeyValue<>("ID123-1", "ID123-A4")
+        );
+
+        final List<KeyValueTimestamp<String, String>> expectedProcessedRecords 
= Arrays.asList(
+            new KeyValueTimestamp<>("ID123-1", "1", TIMESTAMP.toEpochMilli()),
+            new KeyValueTimestamp<>("ID123-1", "2", TIMESTAMP.toEpochMilli())
+        );
+
+        final MockProcessorSupplier<String, String, Void, Void> processor = 
new MockProcessorSupplier<>();
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .count()
+            .toStream()
+            .mapValues(value -> value.toString())
+            .process(runtimeErrorProcessorSupplierMock())
+            .process(processor);
+
+        final Properties properties = new Properties();
+        
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
LogAndFailProcessingExceptionHandler.class);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+            final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
+
+            final StreamsException exception = 
assertThrows(StreamsException.class,
+                () -> inputTopic.pipeKeyValueList(events, TIMESTAMP, 
Duration.ZERO));
+
+            assertTrue(exception.getMessage().contains("Failed to flush cache 
of store KSTREAM-AGGREGATE-STATE-STORE-0000000001"));
+            assertEquals(expectedProcessedRecords.size(), 
processor.theCapturedProcessor().processed().size());
+            assertIterableEquals(expectedProcessedRecords, 
processor.theCapturedProcessor().processed());
+
+            final MetricName dropTotal = droppedRecordsTotalMetric();
+            final MetricName dropRate = droppedRecordsRateMetric();
+
+            assertEquals(0.0, driver.metrics().get(dropTotal).metricValue());
+            assertEquals(0.0, driver.metrics().get(dropRate).metricValue());
+        }
+    }
+
     @Test
     public void 
shouldContinueWhenProcessingExceptionOccursIfExceptionHandlerReturnsContinue() {
         final List<KeyValue<String, String>> events = Arrays.asList(
@@ -153,6 +201,50 @@ public class ProcessingExceptionHandlerIntegrationTest {
         }
     }
 
+    @Test
+    public void 
shouldContinueWhenProcessingExceptionOccursFromFlushingCacheIfExceptionHandlerReturnsContinue()
 {
+        final List<KeyValue<String, String>> events = Arrays.asList(
+            new KeyValue<>("ID123-1", "ID123-A1"),
+            new KeyValue<>("ID123-1", "ID123-A2"),
+            new KeyValue<>("ID123-1", "ID123-A3"),
+            new KeyValue<>("ID123-1", "ID123-A4")
+        );
+
+        final List<KeyValueTimestamp<String, String>> expectedProcessedRecords 
= Arrays.asList(
+            new KeyValueTimestamp<>("ID123-1", "1", TIMESTAMP.toEpochMilli()),
+            new KeyValueTimestamp<>("ID123-1", "2", TIMESTAMP.toEpochMilli()),
+            new KeyValueTimestamp<>("ID123-1", "4", TIMESTAMP.toEpochMilli())
+        );
+
+        final MockProcessorSupplier<String, String, Void, Void> processor = 
new MockProcessorSupplier<>();
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder
+            .stream("TOPIC_NAME", Consumed.with(Serdes.String(), 
Serdes.String()))
+            .groupByKey()
+            .count()
+            .toStream()
+            .mapValues(value -> value.toString())
+            .process(runtimeErrorProcessorSupplierMock())
+            .process(processor);
+
+        final Properties properties = new Properties();
+        
properties.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG, 
LogAndContinueProcessingExceptionHandler.class);
+
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), properties, Instant.ofEpochMilli(0L))) {
+            final TestInputTopic<String, String> inputTopic = 
driver.createInputTopic("TOPIC_NAME", new StringSerializer(), new 
StringSerializer());
+            inputTopic.pipeKeyValueList(events, TIMESTAMP, Duration.ZERO);
+
+            assertEquals(expectedProcessedRecords.size(), 
processor.theCapturedProcessor().processed().size());
+            assertIterableEquals(expectedProcessedRecords, 
processor.theCapturedProcessor().processed());
+
+            final MetricName dropTotal = droppedRecordsTotalMetric();
+            final MetricName dropRate = droppedRecordsRateMetric();
+
+            assertEquals(1.0, driver.metrics().get(dropTotal).metricValue());
+            assertTrue((Double) driver.metrics().get(dropRate).metricValue() > 
0.0);
+        }
+    }
+
     @Test
     public void 
shouldStopOnFailedProcessorWhenProcessingExceptionOccursInFailProcessingExceptionHandler()
 {
         final KeyValue<String, String> event = new KeyValue<>("ID123-1", 
"ID123-A1");
@@ -377,7 +469,7 @@ public class ProcessingExceptionHandlerIntegrationTest {
         return () -> new ContextualProcessor<String, String, String, String>() 
{
             @Override
             public void process(final Record<String, String> record) {
-                if (record.key().contains("ERR")) {
+                if (record.key().contains("ERR") || 
record.value().equals("3")) {
                     throw new RuntimeException("Exception should be handled by 
processing exception handler");
                 }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 30334abc53e..99092dbb4fa 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
+import org.apache.kafka.streams.errors.internals.FailedProcessingException;
 import org.apache.kafka.streams.processor.CommitCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateRestoreListener;
@@ -538,13 +539,20 @@ public class ProcessorStateManager implements 
StateManager {
                 } catch (final RuntimeException exception) {
                     if (firstException == null) {
                         // do NOT wrap the error if it is actually caused by 
Streams itself
-                        if (exception instanceof StreamsException)
+                        // In case of FailedProcessingException Do not keep 
the failed processing exception in the stack trace
+                        if (exception instanceof FailedProcessingException)
+                            firstException = new ProcessorStateException(
+                                format("%sFailed to flush state store %s", 
logPrefix, store.name()),
+                                exception.getCause());
+                        else if (exception instanceof StreamsException)
                             firstException = exception;
                         else
                             firstException = new ProcessorStateException(
                                 format("%sFailed to flush state store %s", 
logPrefix, store.name()), exception);
+                        log.error("Failed to flush state store {}: ", 
store.name(), firstException);
+                    } else {
+                        log.error("Failed to flush state store {}: ", 
store.name(), exception);
                     }
-                    log.error("Failed to flush state store {}: ", 
store.name(), exception);
                 }
             }
         }
@@ -573,7 +581,12 @@ public class ProcessorStateManager implements StateManager 
{
                 } catch (final RuntimeException exception) {
                     if (firstException == null) {
                         // do NOT wrap the error if it is actually caused by 
Streams itself
-                        if (exception instanceof StreamsException) {
+                        // In case of FailedProcessingException Do not keep 
the failed processing exception in the stack trace
+                        if (exception instanceof FailedProcessingException) {
+                            firstException = new ProcessorStateException(
+                                format("%sFailed to flush cache of store %s", 
logPrefix, store.name()),
+                                exception.getCause());
+                        } else if (exception instanceof StreamsException) {
                             firstException = exception;
                         } else {
                             firstException = new ProcessorStateException(
@@ -581,8 +594,10 @@ public class ProcessorStateManager implements StateManager 
{
                                 exception
                             );
                         }
+                        log.error("Failed to flush cache of store {}: ", 
store.name(), firstException);
+                    } else {
+                        log.error("Failed to flush cache of store {}: ", 
store.name(), exception);
                     }
-                    log.error("Failed to flush cache of store {}: ", 
store.name(), exception);
                 }
             }
         }
@@ -618,13 +633,20 @@ public class ProcessorStateManager implements 
StateManager {
                 } catch (final RuntimeException exception) {
                     if (firstException == null) {
                         // do NOT wrap the error if it is actually caused by 
Streams itself
-                        if (exception instanceof StreamsException)
+                        // In case of FailedProcessingException Do not keep 
the failed processing exception in the stack trace
+                        if (exception instanceof FailedProcessingException)
+                            firstException = new ProcessorStateException(
+                                format("%sFailed to close state store %s", 
logPrefix, store.name()),
+                                exception.getCause());
+                        else if (exception instanceof StreamsException)
                             firstException = exception;
                         else
                             firstException = new ProcessorStateException(
                                 format("%sFailed to close state store %s", 
logPrefix, store.name()), exception);
+                        log.error("Failed to close state store {}: ", 
store.name(), firstException);
+                    } else {
+                        log.error("Failed to close state store {}: ", 
store.name(), exception);
                     }
-                    log.error("Failed to close state store {}: ", 
store.name(), exception);
                 }
             }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index a46c54ee05b..da6c418bd1e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
+import org.apache.kafka.streams.errors.internals.FailedProcessingException;
 import org.apache.kafka.streams.processor.CommitCallback;
 import org.apache.kafka.streams.processor.StateRestoreCallback;
 import org.apache.kafka.streams.processor.StateStore;
@@ -38,6 +39,7 @@ import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.internals.CachedStateStore;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.StoreQueryUtils;
+import org.apache.kafka.test.MockCachedKeyValueStore;
 import org.apache.kafka.test.MockKeyValueStore;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.TestUtils;
@@ -57,6 +59,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -771,6 +774,64 @@ public class ProcessorStateManagerTest {
         assertEquals(exception, thrown);
     }
 
+    @Test
+    public void 
shouldThrowProcessorStateExceptionOnFlushIfStoreThrowsAFailedProcessingException()
 {
+        final RuntimeException exception = new RuntimeException("KABOOM!");
+        final ProcessorStateManager stateManager = 
getStateManager(Task.TaskType.ACTIVE);
+        final MockKeyValueStore stateStore = new 
MockKeyValueStore(persistentStoreName, true) {
+            @Override
+            public void flush() {
+                throw new FailedProcessingException("processor", exception);
+            }
+        };
+        stateManager.registerStore(stateStore, 
stateStore.stateRestoreCallback, null);
+
+        final ProcessorStateException thrown = 
assertThrows(ProcessorStateException.class, stateManager::flush);
+        assertEquals(exception, thrown.getCause());
+        
assertFalse(exception.getMessage().contains("FailedProcessingException"));
+        assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch(
+            element -> 
element.getClassName().contains(FailedProcessingException.class.getSimpleName())));
+    }
+
+    @Test
+    public void 
shouldThrowProcessorStateExceptionOnFlushCacheIfStoreThrowsAFailedProcessingException()
 {
+        final RuntimeException exception = new RuntimeException("KABOOM!");
+        final ProcessorStateManager stateManager = 
getStateManager(Task.TaskType.ACTIVE);
+        final MockCachedKeyValueStore stateStore = new 
MockCachedKeyValueStore(persistentStoreName, true) {
+            @Override
+            public void flushCache() {
+                throw new FailedProcessingException("processor", exception);
+            }
+        };
+        stateManager.registerStore(stateStore, 
stateStore.stateRestoreCallback, null);
+
+        final ProcessorStateException thrown = 
assertThrows(ProcessorStateException.class, stateManager::flushCache);
+        assertEquals(exception, thrown.getCause());
+        
assertFalse(exception.getMessage().contains("FailedProcessingException"));
+        assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch(
+            element -> 
element.getClassName().contains(FailedProcessingException.class.getSimpleName())));
+
+    }
+
+    @Test
+    public void 
shouldThrowProcessorStateExceptionOnCloseIfStoreThrowsAFailedProcessingException()
 {
+        final RuntimeException exception = new RuntimeException("KABOOM!");
+        final ProcessorStateManager stateManager = 
getStateManager(Task.TaskType.ACTIVE);
+        final MockKeyValueStore stateStore = new 
MockKeyValueStore(persistentStoreName, true) {
+            @Override
+            public void close() {
+                throw new FailedProcessingException("processor", exception);
+            }
+        };
+        stateManager.registerStore(stateStore, 
stateStore.stateRestoreCallback, null);
+
+        final ProcessorStateException thrown = 
assertThrows(ProcessorStateException.class, stateManager::close);
+        assertEquals(exception, thrown.getCause());
+        
assertFalse(exception.getMessage().contains("FailedProcessingException"));
+        assertFalse(Arrays.stream(thrown.getStackTrace()).anyMatch(
+            element -> 
element.getClassName().contains(FailedProcessingException.class.getSimpleName())));
+    }
+
     @Test
     public void shouldThrowIfRestoringUnregisteredStore() {
         final ProcessorStateManager stateManager = 
getStateManager(Task.TaskType.ACTIVE);
diff --git 
a/streams/src/test/java/org/apache/kafka/test/MockCachedKeyValueStore.java 
b/streams/src/test/java/org/apache/kafka/test/MockCachedKeyValueStore.java
new file mode 100644
index 00000000000..ee55757f7bf
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/test/MockCachedKeyValueStore.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.test;
+
+import org.apache.kafka.streams.state.internals.CacheFlushListener;
+import org.apache.kafka.streams.state.internals.CachedStateStore;
+
+public class MockCachedKeyValueStore extends MockKeyValueStore implements 
CachedStateStore<Object, Object> {
+
+    public MockCachedKeyValueStore(String name, boolean persistent) {
+        super(name, persistent);
+    }
+
+    @Override
+    public boolean setFlushListener(CacheFlushListener<Object, Object> 
listener, boolean sendOldValues) {
+        return false;
+    }
+
+    @Override
+    public void flushCache() {
+
+    }
+
+    @Override
+    public void clearCache() {
+
+    }
+}

Reply via email to