This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 32f03a06be3 KAFKA-17474 fix state transition in GlobalStreamThread 
(#17078)
32f03a06be3 is described below

commit 32f03a06be32e56cd2e4a6527482713bd35cac42
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Sep 3 23:48:51 2024 -0700

    KAFKA-17474 fix state transition in GlobalStreamThread (#17078)
    
    KAFKA-17100 changed the behavior of GlobalStreamThread introducing a race 
condition for state changes, that was exposed by failing (flaky) tests in 
GlobalStreamThreadTest.
    
    This PR moves the state transition to fix the race condition.
    
    Reviewers: Bill Bejeck <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 .../apache/kafka/streams/processor/internals/GlobalStreamThread.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index d064603e0ba..044e0029b86 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -288,7 +288,6 @@ public class GlobalStreamThread extends Thread {
 
             return;
         }
-        setState(RUNNING);
 
         boolean wipeStateStore = false;
         try {
@@ -423,6 +422,7 @@ public class GlobalStreamThread extends Thread {
                 );
             }
 
+            setState(RUNNING);
             return stateConsumer;
         } catch (final StreamsException fatalException) {
             closeStateConsumer(stateConsumer, false);

Reply via email to