This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c903bdf4967 KAFKA-12827 Remove Deprecated method
KafkaStreams#setUncaughtExceptionHandler (#16988)
c903bdf4967 is described below
commit c903bdf496761713d0ed8bed1cd087e6fa562dc2
Author: Abhishek Giri <[email protected]>
AuthorDate: Wed Nov 6 07:08:32 2024 +0100
KAFKA-12827 Remove Deprecated method
KafkaStreams#setUncaughtExceptionHandler (#16988)
Reviewers: Matthias J. Sax <[email protected]>
---
.../streams/integration/EosIntegrationTest.java | 6 +--
.../integration/QueryableStateIntegrationTest.java | 24 ++++++----
...amsUncaughtExceptionHandlerIntegrationTest.java | 27 -----------
.../org/apache/kafka/streams/KafkaStreams.java | 55 ++--------------------
4 files changed, 21 insertions(+), 91 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 4ccba92686a..9d6940a3b5e 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
@@ -418,7 +419,6 @@ public class EosIntegrationTest {
uncommittedRecords,
dataBeforeFailure,
"The uncommitted records before failure do not match what
expected");
-
errorInjected.set(true);
writeInputData(dataAfterFailure);
@@ -1104,7 +1104,7 @@ public class EosIntegrationTest {
final KafkaStreams streams = new KafkaStreams(builder.build(), config);
- streams.setUncaughtExceptionHandler((t, e) -> {
+ streams.setUncaughtExceptionHandler(e -> {
if (uncaughtException != null ||
!(e instanceof StreamsException) ||
!e.getCause().getMessage().equals("Injected test exception."))
{
@@ -1112,8 +1112,8 @@ public class EosIntegrationTest {
hasUnexpectedError = true;
}
uncaughtException = e;
+ return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
});
-
return streams;
}
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 6ae15dcdf03..6b67af943fc 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -103,6 +103,7 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkProperties;
import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;
+import static
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getRunningStreams;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore;
@@ -1032,14 +1033,14 @@ public class QueryableStateIntegrationTest {
final KeyValue<String, String> hello = KeyValue.pair("hello", "hello");
IntegrationTestUtils.produceKeyValuesSynchronously(
- streamThree,
- Arrays.asList(hello, hello, hello, hello, hello, hello, hello,
hello),
- TestUtils.producerConfig(
- CLUSTER.bootstrapServers(),
- StringSerializer.class,
- StringSerializer.class,
- new Properties()),
- mockTime);
+ streamThree,
+ Arrays.asList(hello, hello, hello, hello, hello, hello, hello,
hello),
+ TestUtils.producerConfig(
+ CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class,
+ new Properties()),
+ mockTime);
final int maxWaitMs = 30000;
@@ -1073,8 +1074,8 @@ public class QueryableStateIntegrationTest {
}
+
@Test
- @Deprecated //A single thread should no longer die
public void shouldAllowToQueryAfterThreadDied() throws Exception {
final AtomicBoolean beforeFailure = new AtomicBoolean(true);
final AtomicBoolean failed = new AtomicBoolean(false);
@@ -1097,7 +1098,10 @@ public class QueryableStateIntegrationTest {
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
- kafkaStreams.setUncaughtExceptionHandler((t, e) -> failed.set(true));
+ kafkaStreams.setUncaughtExceptionHandler(exception -> {
+ failed.set(true);
+ return REPLACE_THREAD;
+ });
startApplicationAndWaitUntilRunning(kafkaStreams);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 2ccc36aab24..91ae7748f01 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -77,7 +77,6 @@ import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.fail;
-@SuppressWarnings("deprecation") //Need to call the old handler, will remove
those calls when the old handler is removed
@Tag("integration")
@Timeout(600)
public class StreamsUncaughtExceptionHandlerIntegrationTest {
@@ -141,30 +140,9 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
purgeLocalStreamsState(properties);
}
- @Test
- public void shouldShutdownThreadUsingOldHandler() throws Exception {
- try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
- final AtomicInteger counter = new AtomicInteger(0);
- kafkaStreams.setUncaughtExceptionHandler((t, e) ->
counter.incrementAndGet());
-
- startApplicationAndWaitUntilRunning(kafkaStreams);
- produceMessages(NOW, inputTopic, "A");
-
- // should call the UncaughtExceptionHandler in current thread
- TestUtils.waitForCondition(() -> counter.get() == 1, "Handler was
called 1st time");
- // should call the UncaughtExceptionHandler after rebalancing to
another thread
- TestUtils.waitForCondition(() -> counter.get() == 2,
DEFAULT_DURATION.toMillis(), "Handler was called 2nd time");
- // there is no threads running but the client is still in running
- waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.RUNNING, DEFAULT_DURATION);
-
- assertThat(processorValueCollector.size(), equalTo(2));
- }
- }
-
@Test
public void shouldShutdownClient() throws Exception {
try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
- kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should
not hit old handler"));
kafkaStreams.setUncaughtExceptionHandler(exception ->
SHUTDOWN_CLIENT);
@@ -249,7 +227,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
- kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should
not hit old handler"));
kafkaStreams.setUncaughtExceptionHandler(exception ->
REPLACE_THREAD);
startApplicationAndWaitUntilRunning(kafkaStreams);
@@ -360,8 +337,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology,
properties);
final KafkaStreams kafkaStreams2 = new KafkaStreams(topology,
properties)) {
- kafkaStreams1.setUncaughtExceptionHandler((t, e) -> fail("should
not hit old handler"));
- kafkaStreams2.setUncaughtExceptionHandler((t, e) -> fail("should
not hit old handler"));
kafkaStreams1.setUncaughtExceptionHandler(exception ->
SHUTDOWN_APPLICATION);
kafkaStreams2.setUncaughtExceptionHandler(exception ->
SHUTDOWN_APPLICATION);
@@ -377,8 +352,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
private void testReplaceThreads(final int numThreads) throws Exception {
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
- kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should
not hit old handler"));
-
final AtomicInteger count = new AtomicInteger();
kafkaStreams.setUncaughtExceptionHandler(exception -> {
if (count.incrementAndGet() == numThreads) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 5a59a79ef5a..cc3c3aa9103 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -188,7 +188,6 @@ public class KafkaStreams implements AutoCloseable {
GlobalStreamThread globalStreamThread;
protected StateDirectory stateDirectory = null;
private KafkaStreams.StateListener stateListener;
- private boolean oldHandler;
private BiConsumer<Throwable, Boolean> streamsUncaughtExceptionHandler;
private final Object changeThreadCount = new Object();
@@ -433,32 +432,6 @@ public class KafkaStreams implements AutoCloseable {
}
}
- /**
- * Set the handler invoked when an internal {@link
StreamsConfig#NUM_STREAM_THREADS_CONFIG stream thread} abruptly
- * terminates due to an uncaught exception.
- *
- * @param uncaughtExceptionHandler the uncaught exception handler for all
internal threads; {@code null} deletes the current handler
- * @throws IllegalStateException if this {@code KafkaStreams} instance has
already been started.
- *
- * @deprecated Since 2.8.0. Use {@link
KafkaStreams#setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)}
instead.
- *
- */
- @Deprecated
- public void setUncaughtExceptionHandler(final
Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
- synchronized (stateLock) {
- if (state.hasNotStarted()) {
- oldHandler = true;
- processStreamThread(thread ->
thread.setUncaughtExceptionHandler(uncaughtExceptionHandler));
-
- if (globalStreamThread != null) {
-
globalStreamThread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
- }
- } else {
- throw new IllegalStateException("Can only set
UncaughtExceptionHandler before calling start(). " +
- "Current state is: " + state);
- }
- }
- }
/**
* Set the handler invoked when an internal {@link
StreamsConfig#NUM_STREAM_THREADS_CONFIG stream thread}
@@ -502,21 +475,6 @@ public class KafkaStreams implements AutoCloseable {
}
}
- private void defaultStreamsUncaughtExceptionHandler(final Throwable
throwable, final boolean skipThreadReplacement) {
- if (oldHandler) {
- threads.remove(Thread.currentThread());
- if (throwable instanceof RuntimeException) {
- throw (RuntimeException) throwable;
- } else if (throwable instanceof Error) {
- throw (Error) throwable;
- } else {
- throw new RuntimeException("Unexpected checked exception
caught in the uncaught exception handler", throwable);
- }
- } else {
- handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT,
skipThreadReplacement);
- }
- }
-
private void replaceStreamThread(final Throwable throwable) {
if (globalStreamThread != null &&
Thread.currentThread().getName().equals(globalStreamThread.getName())) {
log.warn("The global thread cannot be replaced. Reverting to
shutting down the client.");
@@ -540,10 +498,7 @@ public class KafkaStreams implements AutoCloseable {
final
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
final boolean
skipThreadReplacement) {
final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
action = streamsUncaughtExceptionHandler.handle(throwable);
- if (oldHandler) {
- log.warn("Stream's new uncaught exception handler is set as well
as the deprecated old handler." +
- "The old handler will be ignored as long as a new handler
is set.");
- }
+
switch (action) {
case REPLACE_THREAD:
if (!skipThreadReplacement) {
@@ -1039,9 +994,7 @@ public class KafkaStreams implements AutoCloseable {
parseHostInfo(applicationConfigs.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)),
logContext
);
-
- oldHandler = false;
- streamsUncaughtExceptionHandler =
this::defaultStreamsUncaughtExceptionHandler;
+ streamsUncaughtExceptionHandler = (throwable, skipThreadReplacement)
-> handleStreamsUncaughtException(throwable, t -> SHUTDOWN_CLIENT,
skipThreadReplacement);
delegatingStateRestoreListener = new DelegatingStateRestoreListener();
delegatingStandbyUpdateListener = new
DelegatingStandbyUpdateListener();
@@ -1062,7 +1015,7 @@ public class KafkaStreams implements AutoCloseable {
time,
globalThreadId,
delegatingStateRestoreListener,
- exception -> defaultStreamsUncaughtExceptionHandler(exception,
false)
+ exception -> handleStreamsUncaughtException(exception, t ->
SHUTDOWN_CLIENT, false)
);
globalThreadState = globalStreamThread.state();
}
@@ -1407,7 +1360,7 @@ public class KafkaStreams implements AutoCloseable {
* However, if you have global stores in your topology, this method blocks
until all global stores are restored.
* As a consequence, any fatal exception that happens during processing is
by default only logged.
* If you want to be notified about dying threads, you can
- * {@link #setUncaughtExceptionHandler(Thread.UncaughtExceptionHandler)
register an uncaught exception handler}
+ * {@link #setUncaughtExceptionHandler(StreamsUncaughtExceptionHandler)
register an uncaught exception handler}
* before starting the {@code KafkaStreams} instance.
* <p>
* Note, for brokers with version {@code 0.9.x} or lower, the broker
version cannot be checked.