This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c903bdf4967 KAFKA-12827 Remove Deprecated method 
KafkaStreams#setUncaughtExceptionHandler (#16988)
c903bdf4967 is described below

commit c903bdf496761713d0ed8bed1cd087e6fa562dc2
Author: Abhishek Giri <[email protected]>
AuthorDate: Wed Nov 6 07:08:32 2024 +0100

    KAFKA-12827 Remove Deprecated method 
KafkaStreams#setUncaughtExceptionHandler (#16988)
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../streams/integration/EosIntegrationTest.java    |  6 +--
 .../integration/QueryableStateIntegrationTest.java | 24 ++++++----
 ...amsUncaughtExceptionHandlerIntegrationTest.java | 27 -----------
 .../org/apache/kafka/streams/KafkaStreams.java     | 55 ++--------------------
 4 files changed, 21 insertions(+), 91 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 4ccba92686a..9d6940a3b5e 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -418,7 +419,6 @@ public class EosIntegrationTest {
                 uncommittedRecords,
                 dataBeforeFailure,
                 "The uncommitted records before failure do not match what 
expected");
-
             errorInjected.set(true);
             writeInputData(dataAfterFailure);
 
@@ -1104,7 +1104,7 @@ public class EosIntegrationTest {
 
         final KafkaStreams streams = new KafkaStreams(builder.build(), config);
 
-        streams.setUncaughtExceptionHandler((t, e) -> {
+        streams.setUncaughtExceptionHandler(e -> {
             if (uncaughtException != null ||
                 !(e instanceof StreamsException) ||
                 !e.getCause().getMessage().equals("Injected test exception.")) 
{
@@ -1112,8 +1112,8 @@ public class EosIntegrationTest {
                 hasUnexpectedError = true;
             }
             uncaughtException = e;
+            return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
         });
-
         return streams;
     }
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 6ae15dcdf03..6b67af943fc 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -103,6 +103,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkProperties;
 import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
+import static 
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getRunningStreams;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
 import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
@@ -1032,14 +1033,14 @@ public class QueryableStateIntegrationTest {
 
         final KeyValue<String, String> hello = KeyValue.pair("hello", "hello");
         IntegrationTestUtils.produceKeyValuesSynchronously(
-                streamThree,
-                Arrays.asList(hello, hello, hello, hello, hello, hello, hello, 
hello),
-                TestUtils.producerConfig(
-                        CLUSTER.bootstrapServers(),
-                        StringSerializer.class,
-                        StringSerializer.class,
-                        new Properties()),
-                mockTime);
+            streamThree,
+            Arrays.asList(hello, hello, hello, hello, hello, hello, hello, 
hello),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            mockTime);
 
         final int maxWaitMs = 30000;
 
@@ -1073,8 +1074,8 @@ public class QueryableStateIntegrationTest {
 
     }
 
+
     @Test
-    @Deprecated //A single thread should no longer die
     public void shouldAllowToQueryAfterThreadDied() throws Exception {
         final AtomicBoolean beforeFailure = new AtomicBoolean(true);
         final AtomicBoolean failed = new AtomicBoolean(false);
@@ -1097,7 +1098,10 @@ public class QueryableStateIntegrationTest {
 
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
         kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
+        kafkaStreams.setUncaughtExceptionHandler(exception -> {
+            failed.set(true);
+            return REPLACE_THREAD;
+        });
 
         startApplicationAndWaitUntilRunning(kafkaStreams);
 
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 2ccc36aab24..91ae7748f01 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -77,7 +77,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.fail;
 
-@SuppressWarnings("deprecation") //Need to call the old handler, will remove 
those calls when the old handler is removed
 @Tag("integration")
 @Timeout(600)
 public class StreamsUncaughtExceptionHandlerIntegrationTest {
@@ -141,30 +140,9 @@ public class 
StreamsUncaughtExceptionHandlerIntegrationTest {
         purgeLocalStreamsState(properties);
     }
 
-    @Test
-    public void shouldShutdownThreadUsingOldHandler() throws Exception {
-        try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
-            final AtomicInteger counter = new AtomicInteger(0);
-            kafkaStreams.setUncaughtExceptionHandler((t, e) -> 
counter.incrementAndGet());
-
-            startApplicationAndWaitUntilRunning(kafkaStreams);
-            produceMessages(NOW, inputTopic, "A");
-
-            // should call the UncaughtExceptionHandler in current thread
-            TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was 
called 1st time");
-            // should call the UncaughtExceptionHandler after rebalancing to 
another thread
-            TestUtils.waitForCondition(() -> counter.get() == 2, 
DEFAULT_DURATION.toMillis(), "Handler was called 2nd time");
-            // there is no threads running but the client is still in running
-            waitForApplicationState(Collections.singletonList(kafkaStreams), 
KafkaStreams.State.RUNNING, DEFAULT_DURATION);
-
-            assertThat(processorValueCollector.size(), equalTo(2));
-        }
-    }
-
     @Test
     public void shouldShutdownClient() throws Exception {
         try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
-            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should 
not hit old handler"));
 
             kafkaStreams.setUncaughtExceptionHandler(exception -> 
SHUTDOWN_CLIENT);
 
@@ -249,7 +227,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
         properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
 
         try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
-            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should 
not hit old handler"));
             kafkaStreams.setUncaughtExceptionHandler(exception -> 
REPLACE_THREAD);
 
             startApplicationAndWaitUntilRunning(kafkaStreams);
@@ -360,8 +337,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
 
         try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, 
properties);
              final KafkaStreams kafkaStreams2 = new KafkaStreams(topology, 
properties)) {
-            kafkaStreams1.setUncaughtExceptionHandler((t, e) -> fail("should 
not hit old handler"));
-            kafkaStreams2.setUncaughtExceptionHandler((t, e) -> fail("should 
not hit old handler"));
             kafkaStreams1.setUncaughtExceptionHandler(exception -> 
SHUTDOWN_APPLICATION);
             kafkaStreams2.setUncaughtExceptionHandler(exception -> 
SHUTDOWN_APPLICATION);
 
@@ -377,8 +352,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
     private void testReplaceThreads(final int numThreads) throws Exception {
         properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
         try (final KafkaStreams kafkaStreams = new 
KafkaStreams(builder.build(), properties)) {
-            kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should 
not hit old handler"));
-
             final AtomicInteger count = new AtomicInteger();
             kafkaStreams.setUncaughtExceptionHandler(exception -> {
                 if (count.incrementAndGet() == numThreads) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 5a59a79ef5a..cc3c3aa9103 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -188,7 +188,6 @@ public class KafkaStreams implements AutoCloseable {
     GlobalStreamThread globalStreamThread;
     protected StateDirectory stateDirectory = null;
     private KafkaStreams.StateListener stateListener;
-    private boolean oldHandler;
     private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler;
     private final Object changeThreadCount = new Object();
 
@@ -433,32 +432,6 @@ public class KafkaStreams implements AutoCloseable {
         }
     }
 
-    /**
-     * Set the handler invoked when an internal {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG stream thread} abruptly
-     * terminates due to an uncaught exception.
-     *
-     * @param uncaughtExceptionHandler the uncaught exception handler for all 
internal threads; {@code null} deletes the current handler
-     * @throws IllegalStateException if this {@code KafkaStreams} instance has 
already been started.
-     *
-     * @deprecated Since 2.8.0. Use {@link 
KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)} 
instead.
-     *
-     */
-    @Deprecated
-    public void setUncaughtExceptionHandler(final 
Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
-        synchronized (stateLock) {
-            if (state.hasNotStarted()) {
-                oldHandler = true;
-                processStreamThread(thread -> 
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler));
-
-                if (globalStreamThread != null) {
-                    
globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
-                }
-            } else {
-                throw new IllegalStateException("Can only set 
UncaughtExceptionHandler before calling start(). " +
-                    "Current state is: " + state);
-            }
-        }
-    }
 
     /**
      * Set the handler invoked when an internal {@link 
StreamsConfig#NUM_STREAM_THREADS_CONFIG stream thread}
@@ -502,21 +475,6 @@ public class KafkaStreams implements AutoCloseable {
         }
     }
 
-    private void defaultStreamsUncaughtExceptionHandler(final Throwable 
throwable, final boolean skipThreadReplacement) {
-        if (oldHandler) {
-            threads.remove(Thread.currentThread());
-            if (throwable instanceof RuntimeException) {
-                throw (RuntimeException) throwable;
-            } else if (throwable instanceof Error) {
-                throw (Error) throwable;
-            } else {
-                throw new RuntimeException("Unexpected checked exception 
caught in the uncaught exception handler", throwable);
-            }
-        } else {
-            handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT, 
skipThreadReplacement);
-        }
-    }
-
     private void replaceStreamThread(final Throwable throwable) {
         if (globalStreamThread != null && 
Thread.currentThread().getName().equals(globalStreamThread.getName())) {
             log.warn("The global thread cannot be replaced. Reverting to 
shutting down the client.");
@@ -540,10 +498,7 @@ public class KafkaStreams implements AutoCloseable {
                                                 final 
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
                                                 final boolean 
skipThreadReplacement) {
         final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse 
action = streamsUncaughtExceptionHandler.handle(throwable);
-        if (oldHandler) {
-            log.warn("Stream's new uncaught exception handler is set as well 
as the deprecated old handler." +
-                    "The old handler will be ignored as long as a new handler 
is set.");
-        }
+
         switch (action) {
             case REPLACE_THREAD:
                 if (!skipThreadReplacement) {
@@ -1039,9 +994,7 @@ public class KafkaStreams implements AutoCloseable {
             
parseHostInfo(applicationConfigs.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
             logContext
         );
-
-        oldHandler = false;
-        streamsUncaughtExceptionHandler = 
this::defaultStreamsUncaughtExceptionHandler;
+        streamsUncaughtExceptionHandler = (throwable, skipThreadReplacement) 
-> handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT, 
skipThreadReplacement);
         delegatingStateRestoreListener = new DelegatingStateRestoreListener();
         delegatingStandbyUpdateListener = new 
DelegatingStandbyUpdateListener();
 
@@ -1062,7 +1015,7 @@ public class KafkaStreams implements AutoCloseable {
                 time,
                 globalThreadId,
                 delegatingStateRestoreListener,
-                exception -> defaultStreamsUncaughtExceptionHandler(exception, 
false)
+                exception -> handleStreamsUncaughtException(exception, t -> 
SHUTDOWN_CLIENT, false)
             );
             globalThreadState = globalStreamThread.state();
         }
@@ -1407,7 +1360,7 @@ public class KafkaStreams implements AutoCloseable {
      * However, if you have global stores in your topology, this method blocks 
until all global stores are restored.
      * As a consequence, any fatal exception that happens during processing is 
by default only logged.
      * If you want to be notified about dying threads, you can
-     * {@link #setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler) 
register an uncaught exception handler}
+     * {@link #setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler) 
register an uncaught exception handler}
      * before starting the {@code KafkaStreams} instance.
      * <p>
      * Note, for brokers with version {@code 0.9.x} or lower, the broker 
version cannot be checked.

Reply via email to