This is an automated email from the ASF dual-hosted git repository.
ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b62d8b975c KAFKA-12699: Override the default handler for stream
threads if the stream's handler is used (#12324)
b62d8b975c is described below
commit b62d8b975cf97b5c1328b9b03a05fa09b07cf13a
Author: Walker Carlson <[email protected]>
AuthorDate: Tue Jul 19 15:35:26 2022 -0500
KAFKA-12699: Override the default handler for stream threads if the
stream's handler is used (#12324)
Override the default handler for stream threads if the stream's handler is
used. We do no want the java default handler triggering when a thread is
replaced.
Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java | 7 +++++++
.../StreamsUncaughtExceptionHandlerIntegrationTest.java | 6 ++++++
2 files changed, 13 insertions(+)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 3a61f05de1..05d99dd172 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -471,6 +471,13 @@ public class KafkaStreams implements AutoCloseable {
exception -> handleStreamsUncaughtException(exception,
userStreamsUncaughtExceptionHandler, false)
);
}
+ processStreamThread(thread ->
thread.setUncaughtExceptionHandler((t, e) -> { }
+ ));
+
+ if (globalStreamThread != null) {
+ globalStreamThread.setUncaughtExceptionHandler((t, e) -> {
}
+ );
+ }
} else {
throw new IllegalStateException("Can only set
UncaughtExceptionHandler before calling start(). " +
"Current state is: " + state);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index be98e8d9fc..4af333a65a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -176,6 +176,12 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
testReplaceThreads(2);
}
+ @Test
+ public void shouldReplaceThreadsWithoutJavaHandler() throws
InterruptedException {
+ Thread.setDefaultUncaughtExceptionHandler((t, e) -> fail("exception
thrown"));
+ testReplaceThreads(2);
+ }
+
@Test
public void shouldReplaceSingleThread() throws InterruptedException {
testReplaceThreads(1);