This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 32f03a06be3 KAFKA-17474 fix state transition in GlobalStreamThread
(#17078)
32f03a06be3 is described below
commit 32f03a06be32e56cd2e4a6527482713bd35cac42
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Sep 3 23:48:51 2024 -0700
KAFKA-17474 fix state transition in GlobalStreamThread (#17078)
KAFKA-17100 changed the behavior of GlobalStreamThread introducing a race
condition for state changes, that was exposed by failing (flaky) tests in
GlobalStreamThreadTest.
This PR moves the state transition to fix the race condition.
Reviewers: Bill Bejeck <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
.../apache/kafka/streams/processor/internals/GlobalStreamThread.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index d064603e0ba..044e0029b86 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -288,7 +288,6 @@ public class GlobalStreamThread extends Thread {
return;
}
- setState(RUNNING);
boolean wipeStateStore = false;
try {
@@ -423,6 +422,7 @@ public class GlobalStreamThread extends Thread {
);
}
+ setState(RUNNING);
return stateConsumer;
} catch (final StreamsException fatalException) {
closeStateConsumer(stateConsumer, false);