This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b62d8b975c KAFKA-12699: Override the default handler for stream 
threads if the stream's handler is used (#12324)
b62d8b975c is described below

commit b62d8b975cf97b5c1328b9b03a05fa09b07cf13a
Author: Walker Carlson <[email protected]>
AuthorDate: Tue Jul 19 15:35:26 2022 -0500

    KAFKA-12699: Override the default handler for stream threads if the 
stream's handler is used (#12324)
    
    Override the default handler for stream threads if the stream's handler is 
used. We do no want the java default handler triggering when a thread is 
replaced.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java   | 7 +++++++
 .../StreamsUncaughtExceptionHandlerIntegrationTest.java            | 6 ++++++
 2 files changed, 13 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 3a61f05de1..05d99dd172 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -471,6 +471,13 @@ public class KafkaStreams implements AutoCloseable {
                         exception -> handleStreamsUncaughtException(exception, 
userStreamsUncaughtExceptionHandler, false)
                     );
                 }
+                processStreamThread(thread -> 
thread.setUncaughtExceptionHandler((t, e) -> { }
+                ));
+
+                if (globalStreamThread != null) {
+                    globalStreamThread.setUncaughtExceptionHandler((t, e) -> { 
}
+                    );
+                }
             } else {
                 throw new IllegalStateException("Can only set 
UncaughtExceptionHandler before calling start(). " +
                     "Current state is: " + state);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index be98e8d9fc..4af333a65a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -176,6 +176,12 @@ public class 
StreamsUncaughtExceptionHandlerIntegrationTest {
         testReplaceThreads(2);
     }
 
+    @Test
+    public void shouldReplaceThreadsWithoutJavaHandler() throws 
InterruptedException {
+        Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception 
thrown"));
+        testReplaceThreads(2);
+    }
+
     @Test
     public void shouldReplaceSingleThread() throws InterruptedException {
         testReplaceThreads(1);

Reply via email to