This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 243a308e4bd KAFKA-19724: Global stream thread should not ignore any
exceptions (#20668)
243a308e4bd is described below
commit 243a308e4bd90ffa52d74efe5b35ad222da97395
Author: Fatih <[email protected]>
AuthorDate: Fri Oct 17 08:19:21 2025 +0300
KAFKA-19724: Global stream thread should not ignore any exceptions (#20668)
Kafka Streams does not catch Error types that occur during
`GlobalStreamThread` initiation, and therefore it is not possible to
trace the error (for example, an `ExceptionInInitializerError` occurs
when RocksDB is not found for a global store). This is because errors
are not caught and logged.
The catch block in `GlobalStreamThread#initialize()` has been ensured to
catch `Throwable` instead of `Exception`. Additionally, the empty
`setUncaughtHandler` set operation that prevented this from taking
effect when users employed setUncaughtExceptionHandler has been removed.
Reviewers: Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 5 ---
.../processor/internals/GlobalStreamThread.java | 4 +-
.../internals/GlobalStreamThreadTest.java | 52 +++++++++++++++++++---
3 files changed, 48 insertions(+), 13 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 3c6f4975cb8..656efc8bfe9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -470,11 +470,6 @@ public class KafkaStreams implements AutoCloseable {
}
processStreamThread(thread ->
thread.setUncaughtExceptionHandler((t, e) -> { }
));
-
- if (globalStreamThread != null) {
- globalStreamThread.setUncaughtExceptionHandler((t, e) -> {
}
- );
- }
} else {
throw new IllegalStateException("Can only set
UncaughtExceptionHandler before calling start(). " +
"Current state is: " + state);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 173ebdb8d4b..1025adc6744 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -239,7 +239,7 @@ public class GlobalStreamThread extends Thread {
}
/**
- * @throws IllegalStateException If store gets registered after
initialized is already finished
+ * @throws IllegalStateException If a store gets registered after
initialized is already finished
* @throws StreamsException if the store's change log does not
contain the partition
*/
void initialize() {
@@ -431,7 +431,7 @@ public class GlobalStreamThread extends Thread {
} catch (final StreamsException fatalException) {
closeStateConsumer(stateConsumer, false);
startupException = fatalException;
- } catch (final Exception fatalException) {
+ } catch (final Throwable fatalException) {
closeStateConsumer(stateConsumer, false);
startupException = new StreamsException("Exception caught during
initialization of GlobalStreamThread", fatalException);
} finally {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 419d6b01013..d964b0c80b0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -124,12 +124,7 @@ public class GlobalStreamThreadTest {
);
baseDirectoryName = TestUtils.tempDirectory().getAbsolutePath();
- final HashMap<String, Object> properties = new HashMap<>();
- properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
- properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "testAppId");
- properties.put(StreamsConfig.STATE_DIR_CONFIG, baseDirectoryName);
- properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArraySerde.class.getName());
- properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArraySerde.class.getName());
+ final HashMap<String, Object> properties = getStreamProperties();
config = new StreamsConfig(properties);
globalStreamThread = new GlobalStreamThread(
builder.rewriteTopology(config).buildGlobalStateTopology(),
@@ -409,6 +404,51 @@ public class GlobalStreamThreadTest {
}
}
+ @Test
+ public void shouldThrowStreamsExceptionOnStartupIfThrowableOccurred()
throws Exception {
+ final String exceptionMessage = "Throwable occurred!";
+ final MockConsumer<byte[], byte[]> consumer = new
MockConsumer<>(AutoOffsetResetStrategy.EARLIEST.name()) {
+ @Override
+ public List<PartitionInfo> partitionsFor(final String topic) {
+ throw new ExceptionInInitializerError(exceptionMessage);
+ }
+ };
+ final StateStore globalStore =
builder.globalStateStores().get(GLOBAL_STORE_NAME);
+ globalStreamThread = new GlobalStreamThread(
+ builder.buildGlobalStateTopology(),
+ config,
+ consumer,
+ new StateDirectory(config, time, true, false),
+ 0,
+ new StreamsMetricsImpl(new Metrics(), "test-client",
"processId", time),
+ time,
+ "clientId",
+ stateRestoreListener,
+ e -> { }
+ );
+
+ try {
+ globalStreamThread.start();
+ fail("Should have thrown StreamsException if start up failed");
+ } catch (final StreamsException e) {
+ assertThat(e.getCause(), instanceOf(Throwable.class));
+ assertThat(e.getCause().getMessage(), equalTo(exceptionMessage));
+ }
+ globalStreamThread.join();
+ assertThat(globalStore.isOpen(), is(false));
+ assertFalse(globalStreamThread.stillRunning());
+ }
+
+ private HashMap<String, Object> getStreamProperties() {
+ final HashMap<String, Object> properties = new HashMap<>();
+ properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
+ properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "testAppId");
+ properties.put(StreamsConfig.STATE_DIR_CONFIG, baseDirectoryName);
+ properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.ByteArraySerde.class.getName());
+ properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.ByteArraySerde.class.getName());
+ return properties;
+ }
+
private void initializeConsumer() {
mockConsumer.updatePartitions(
GLOBAL_STORE_TOPIC_NAME,