This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 45009ef382 Revert "KAFKA-12887 Skip some RuntimeExceptions from
exception handler (#11228)" (#12421)
45009ef382 is described below
commit 45009ef382145fb6e33c5ebec03600b37a1474c0
Author: Walker Carlson <[email protected]>
AuthorDate: Tue Jul 19 11:17:46 2022 -0500
Revert "KAFKA-12887 Skip some RuntimeExceptions from exception handler
(#11228)" (#12421)
This reverts commit 4835c64f
Reviewers: Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 22 +-------
.../integration/EmitOnChangeIntegrationTest.java | 2 +-
...amsUncaughtExceptionHandlerIntegrationTest.java | 61 +++-------------------
.../processor/internals/StreamThreadTest.java | 4 +-
4 files changed, 11 insertions(+), 78 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index a923c8e983..3a61f05de1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -155,9 +155,6 @@ public class KafkaStreams implements AutoCloseable {
private static final String JMX_PREFIX = "kafka.streams";
- private static final Set<Class<? extends Throwable>>
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS =
- new HashSet<>(Arrays.asList(IllegalStateException.class,
IllegalArgumentException.class));
-
// processId is expected to be unique across JVMs and to be used
// in userData of the subscription request to allow assignor be aware
// of the co-location of stream thread's consumers. It is for internal
@@ -515,25 +512,10 @@ public class KafkaStreams implements AutoCloseable {
}
}
- private boolean wrappedExceptionIsIn(final Throwable throwable, final
Set<Class<? extends Throwable>> exceptionsOfInterest) {
- return throwable.getCause() != null &&
exceptionsOfInterest.contains(throwable.getCause().getClass());
- }
-
- private StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
getActionForThrowable(final Throwable throwable,
-
final StreamsUncaughtExceptionHandler
streamsUncaughtExceptionHandler) {
- final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
action;
- if (wrappedExceptionIsIn(throwable,
EXCEPTIONS_NOT_TO_BE_HANDLED_BY_USERS)) {
- action = SHUTDOWN_CLIENT;
- } else {
- action = streamsUncaughtExceptionHandler.handle(throwable);
- }
- return action;
- }
-
private void handleStreamsUncaughtException(final Throwable throwable,
final
StreamsUncaughtExceptionHandler streamsUncaughtExceptionHandler,
final boolean
skipThreadReplacement) {
- final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
action = getActionForThrowable(throwable, streamsUncaughtExceptionHandler);
+ final StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse
action = streamsUncaughtExceptionHandler.handle(throwable);
if (oldHandler) {
log.warn("Stream's new uncaught exception handler is set as well
as the deprecated old handler." +
"The old handler will be ignored as long as a new handler
is set.");
@@ -549,7 +531,7 @@ public class KafkaStreams implements AutoCloseable {
break;
case SHUTDOWN_CLIENT:
log.error("Encountered the following exception during
processing " +
- "and Kafka Streams opted to " + action + "." +
+ "and the registered exception handler opted to " +
action + "." +
" The streams client is going to shut down now. ",
throwable);
closeToError();
break;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
index 27958730cf..f41c95a6bb 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/EmitOnChangeIntegrationTest.java
@@ -111,7 +111,7 @@ public class EmitOnChangeIntegrationTest {
.toStream()
.map((key, value) -> {
if (shouldThrow.compareAndSet(true, false)) {
- throw new RuntimeException("Kaboom");
+ throw new IllegalStateException("Kaboom");
} else {
return new KeyValue<>(key, value);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index c81ddcfa74..be98e8d9fc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -93,9 +93,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest {
}
public static final Duration DEFAULT_DURATION = Duration.ofSeconds(30);
- private static final AtomicBoolean THROW_ERROR = new AtomicBoolean(true);
- private static final AtomicBoolean THROW_ILLEGAL_STATE_EXCEPTION = new
AtomicBoolean(false);
- private static final AtomicBoolean THROW_ILLEGAL_ARGUMENT_EXCEPTION = new
AtomicBoolean(false);
@Rule
public final TestName testName = new TestName();
@@ -108,6 +105,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
private final String outputTopic2 = "output2" + testId;
private final StreamsBuilder builder = new StreamsBuilder();
private final List<String> processorValueCollector = new ArrayList<>();
+ private static AtomicBoolean throwError = new AtomicBoolean(true);
private final Properties properties = basicProps();
@@ -173,47 +171,6 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
}
}
-
- @Test
- public void shouldShutdownClientWhenIllegalStateException() throws
InterruptedException {
- THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(false, true);
- try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
- kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should
not hit old handler"));
-
- kafkaStreams.setUncaughtExceptionHandler(exception ->
REPLACE_THREAD); // if the user defined uncaught exception handler would be hit
we would be replacing the thread
-
-
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
- produceMessages(0L, inputTopic, "A");
- waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.ERROR, DEFAULT_DURATION);
-
- assertThat(processorValueCollector.size(), equalTo(1));
- } finally {
- THROW_ILLEGAL_STATE_EXCEPTION.compareAndSet(true, false);
- }
-
- }
-
- @Test
- public void shouldShutdownClientWhenIllegalArgumentException() throws
InterruptedException {
- THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(false, true);
- try (final KafkaStreams kafkaStreams = new
KafkaStreams(builder.build(), properties)) {
- kafkaStreams.setUncaughtExceptionHandler((t, e) -> fail("should
not hit old handler"));
-
- kafkaStreams.setUncaughtExceptionHandler(exception ->
REPLACE_THREAD); // if the user defined uncaught exception handler would be hit
we would be replacing the thread
-
-
StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
-
- produceMessages(0L, inputTopic, "A");
- waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.ERROR, DEFAULT_DURATION);
-
- assertThat(processorValueCollector.size(), equalTo(1));
- } finally {
- THROW_ILLEGAL_ARGUMENT_EXCEPTION.compareAndSet(true, false);
- }
-
- }
-
@Test
public void shouldReplaceThreads() throws InterruptedException {
testReplaceThreads(2);
@@ -365,16 +322,10 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
@Override
public void process(final String key, final String value) {
valueList.add(value + " " + context.taskId());
- if (THROW_ERROR.get()) {
- if (THROW_ILLEGAL_STATE_EXCEPTION.get()) {
- throw new IllegalStateException("Something unexpected
happened in " + Thread.currentThread().getName());
- } else if (THROW_ILLEGAL_ARGUMENT_EXCEPTION.get()) {
- throw new IllegalArgumentException("Something unexpected
happened in " + Thread.currentThread().getName());
- } else {
- throw new
StreamsException(Thread.currentThread().getName());
- }
+ if (throwError.get()) {
+ throw new StreamsException(Thread.currentThread().getName());
}
- THROW_ERROR.set(true);
+ throwError.set(true);
}
}
@@ -408,7 +359,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
final AtomicInteger count = new AtomicInteger();
kafkaStreams.setUncaughtExceptionHandler(exception -> {
if (count.incrementAndGet() == numThreads) {
- THROW_ERROR.set(false);
+ throwError.set(false);
}
return REPLACE_THREAD;
});
@@ -416,7 +367,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
produceMessages(0L, inputTopic, "A");
TestUtils.waitForCondition(() -> count.get() == numThreads,
"finished replacing threads");
- TestUtils.waitForCondition(() -> THROW_ERROR.get(), "finished
replacing threads");
+ TestUtils.waitForCondition(() -> throwError.get(), "finished
replacing threads");
kafkaStreams.close();
waitForApplicationState(Collections.singletonList(kafkaStreams),
KafkaStreams.State.NOT_RUNNING, DEFAULT_DURATION);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 5d898c345b..a43b0793a2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -2362,9 +2362,9 @@ public class StreamThreadTest {
expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
consumer.subscribe((Collection<String>) anyObject(), anyObject());
- EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expectLastCall().anyTimes();
consumer.unsubscribe();
- EasyMock.expectLastCall().atLeastOnce();
+ EasyMock.expectLastCall().anyTimes();
EasyMock.replay(consumerGroupMetadata);
final Task task1 = mock(Task.class);
final Task task2 = mock(Task.class);