This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 4df04416f78 KAFKA-19724: Global stream thread should not ignore any 
exceptions (#20668)
4df04416f78 is described below

commit 4df04416f78621c0f8fefb6f79498f6ecc9c7de6
Author: Fatih <[email protected]>
AuthorDate: Fri Oct 17 08:19:21 2025 +0300

    KAFKA-19724: Global stream thread should not ignore any exceptions (#20668)
    
    Kafka Streams does not catch Error types that occur during
    `GlobalStreamThread` initiation, and therefore it is not possible to
    trace the error (for example, an `ExceptionInInitializerError` occurs
    when RocksDB is not found for a global store). This is because errors
    are not caught and logged.
    
    The catch block in `GlobalStreamThread#initialize()` has been ensured to
    catch `Throwable` instead of `Exception`. Additionally, the empty
    `setUncaughtHandler` set operation that prevented this from taking
    effect when users employed setUncaughtExceptionHandler has been removed.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../org/apache/kafka/streams/KafkaStreams.java     |  5 ---
 .../processor/internals/GlobalStreamThread.java    |  4 +-
 .../internals/GlobalStreamThreadTest.java          | 52 +++++++++++++++++++---
 3 files changed, 48 insertions(+), 13 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java 
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index e9496c0cdc6..b80d22c9635 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -476,11 +476,6 @@ public class KafkaStreams implements AutoCloseable {
                 }
                 processStreamThread(thread -> 
thread.setUncaughtExceptionHandler((t, e) -> { }
                 ));
-
-                if (globalStreamThread != null) {
-                    globalStreamThread.setUncaughtExceptionHandler((t, e) -> { 
}
-                    );
-                }
             } else {
                 throw new IllegalStateException("Can only set 
UncaughtExceptionHandler before calling start(). " +
                     "Current state is: " + state);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index 1c7194b1913..fdddb42a835 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -243,7 +243,7 @@ public class GlobalStreamThread extends Thread {
         }
 
         /**
-         * @throws IllegalStateException If store gets registered after 
initialized is already finished
+         * @throws IllegalStateException If a store gets registered after 
initialized is already finished
          * @throws StreamsException      if the store's change log does not 
contain the partition
          */
         void initialize() {
@@ -433,7 +433,7 @@ public class GlobalStreamThread extends Thread {
         } catch (final StreamsException fatalException) {
             closeStateConsumer(stateConsumer, false);
             startupException = fatalException;
-        } catch (final Exception fatalException) {
+        } catch (final Throwable fatalException) {
             closeStateConsumer(stateConsumer, false);
             startupException = new StreamsException("Exception caught during 
initialization of GlobalStreamThread", fatalException);
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
index 73b610227a5..3a8cdf63f7c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
@@ -121,12 +121,7 @@ public class GlobalStreamThreadTest {
         );
 
         baseDirectoryName = TestUtils.tempDirectory().getAbsolutePath();
-        final HashMap<String, Object> properties = new HashMap<>();
-        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
-        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "testAppId");
-        properties.put(StreamsConfig.STATE_DIR_CONFIG, baseDirectoryName);
-        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.ByteArraySerde.class.getName());
-        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArraySerde.class.getName());
+        final HashMap<String, Object> properties = getStreamProperties();
         config = new StreamsConfig(properties);
         globalStreamThread = new GlobalStreamThread(
             builder.rewriteTopology(config).buildGlobalStateTopology(),
@@ -406,6 +401,51 @@ public class GlobalStreamThreadTest {
         }
     }
 
+    @Test
+    public void shouldThrowStreamsExceptionOnStartupIfThrowableOccurred() 
throws Exception {
+        final String exceptionMessage = "Throwable occurred!";
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], 
byte[]>(OffsetResetStrategy.EARLIEST) {
+            @Override
+            public List<PartitionInfo> partitionsFor(final String topic) {
+                throw new ExceptionInInitializerError(exceptionMessage);
+            }
+        };
+        final StateStore globalStore = 
builder.globalStateStores().get(GLOBAL_STORE_NAME);
+        globalStreamThread = new GlobalStreamThread(
+                builder.buildGlobalStateTopology(),
+                config,
+                consumer,
+                new StateDirectory(config, time, true, false),
+                0,
+                new StreamsMetricsImpl(new Metrics(), "test-client", 
"processId", time),
+                time,
+                "clientId",
+                stateRestoreListener,
+                e -> { }
+        );
+
+        try {
+            globalStreamThread.start();
+            fail("Should have thrown StreamsException if start up failed");
+        } catch (final StreamsException e) {
+            assertThat(e.getCause(), instanceOf(Throwable.class));
+            assertThat(e.getCause().getMessage(), equalTo(exceptionMessage));
+        }
+        globalStreamThread.join();
+        assertThat(globalStore.isOpen(), is(false));
+        assertFalse(globalStreamThread.stillRunning());
+    }
+
+    private HashMap<String, Object> getStreamProperties() {
+        final HashMap<String, Object> properties = new HashMap<>();
+        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "blah");
+        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "testAppId");
+        properties.put(StreamsConfig.STATE_DIR_CONFIG, baseDirectoryName);
+        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.ByteArraySerde.class.getName());
+        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.ByteArraySerde.class.getName());
+        return properties;
+    }
+
     private void initializeConsumer() {
         mockConsumer.updatePartitions(
             GLOBAL_STORE_TOPIC_NAME,

Reply via email to