This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 102de21355e KAFKA-17379: Fix inexpected state transition from ERROR to
PENDING_SHUTDOWN (#18765)
102de21355e is described below
commit 102de21355e465771ddd0ba1464cac389abb0e01
Author: Lucas Brutschy <[email protected]>
AuthorDate: Wed Feb 5 17:09:14 2025 +0100
KAFKA-17379: Fix inexpected state transition from ERROR to PENDING_SHUTDOWN
(#18765)
The exception stack trace shown in the the ticket can happen when we are
concurrently closing the producer because of an error and doing a
regular close. This is not a bug in the test, but a real race condition
that can happen.
The sequence is this:
Thread1: Enter PENDING_ERROR
Thread2: Check if state is already ERROR
Thread1: Transition to ERROR
Thread2: Check if state is already PENDING_ERROR
Thread2: Transition to PENDING_SHUTDOWN
One idea to fix this would be to synchronize the sequence performed by
Thread1 using the state lock. However, this would require more changes,
since we cannot use the normal state transition method `setState` while
owning the lock, as it calls user-defined callbacks, which may create
deadlocks. Do avoid adding more synchronization, we can also fix it by
first attempting to transition to PENDING_SHUTDOWN, and _then_ checking
whether another thread is already attempting to shut down (states
PENDING_SHUTDOWN, PENDING_ERROR, ERROR, NOT_RUNNING). Since we never
transition from a shutdown state back to a non-shutdown state.
Reviewers: Matthias J. Sax <[email protected]>
---
.../org/apache/kafka/streams/KafkaStreams.java | 63 +++++++++++-----------
1 file changed, 33 insertions(+), 30 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 79e6af29be7..4ee22f04556 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -109,6 +109,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import static
org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
import static
org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
@@ -293,13 +294,14 @@ public class KafkaStreams implements AutoCloseable {
private final Object stateLock = new Object();
protected volatile State state = State.CREATED;
- private boolean waitOnState(final State targetState, final long waitMs) {
+ private boolean waitOnStates(final long waitMs, final State...
targetStates) {
+ final Set<State> targetStateSet = Set.of(targetStates);
final long begin = time.milliseconds();
synchronized (stateLock) {
boolean interrupted = false;
long elapsedMs = 0L;
try {
- while (state != targetState) {
+ while (!targetStateSet.contains(state)) {
if (waitMs > elapsedMs) {
final long remainingMs = waitMs - elapsedMs;
try {
@@ -308,7 +310,11 @@ public class KafkaStreams implements AutoCloseable {
interrupted = true;
}
} else {
- log.debug("Cannot transit to {} within {}ms",
targetState, waitMs);
+ log.debug(
+ "Cannot transit to {} within {}ms",
+
Arrays.stream(targetStates).map(State::toString).collect(Collectors.joining("
or ")),
+ waitMs
+ );
return false;
}
elapsedMs = time.milliseconds() - begin;
@@ -347,8 +353,9 @@ public class KafkaStreams implements AutoCloseable {
} else if (state == State.REBALANCING && newState ==
State.REBALANCING) {
// when the state is already in REBALANCING, it should not
transit to REBALANCING again
return false;
- } else if (state == State.ERROR && (newState ==
State.PENDING_ERROR || newState == State.ERROR)) {
- // when the state is already in ERROR, its transition to
PENDING_ERROR or ERROR (due to consecutive close calls)
+ } else if (state == State.ERROR && (newState ==
State.PENDING_ERROR || newState == State.ERROR || newState ==
State.PENDING_SHUTDOWN)) {
+ // when the state is already in ERROR, its transition attempts
to PENDING_SHUTDOWN, PENDING_ERROR or ERROR will
+ // not throw an exception.
return false;
} else if (state == State.PENDING_ERROR && newState !=
State.ERROR) {
// when the state is already in PENDING_ERROR, all other
transitions than ERROR (due to thread dying) will be
@@ -1543,38 +1550,34 @@ public class KafkaStreams implements AutoCloseable {
timeoutMs = Long.MAX_VALUE;
}
- if (state.hasCompletedShutdown()) {
- log.info("Streams client is already in the terminal {} state, all
resources are closed and the client has stopped.", state);
- return true;
- }
- if (state.isShuttingDown()) {
- log.info("Streams client is in {}, all resources are being closed
and the client will be stopped.", state);
- if (state == State.PENDING_ERROR && waitOnState(State.ERROR,
timeoutMs)) {
- log.info("Streams client stopped to ERROR completely");
+ if (!setState(State.PENDING_SHUTDOWN)) {
+ // Copy the state so that we can atomically check if we are shut
down and act on it (log it)
+ final State immutableStateCopy = state;
+ if (immutableStateCopy.isShuttingDown()) {
+ log.info("Skipping shutdown since Streams client is already in
{}, waiting for a terminal state", immutableStateCopy);
+ if (!waitOnStates(timeoutMs, State.ERROR, State.NOT_RUNNING)) {
+ log.warn("Streams client did transition to a terminal
state (ERROR or NOT_RUNNING) within the {}ms timeout", timeoutMs);
+ return false;
+ }
+ log.info("Streams client stopped completely and transitioned
to the terminal {} state", state);
return true;
- } else if (state == State.PENDING_SHUTDOWN &&
waitOnState(State.NOT_RUNNING, timeoutMs)) {
- log.info("Streams client stopped to NOT_RUNNING completely");
+ }
+
+ if (state.hasCompletedShutdown()) {
+ log.info("Skipping shutdown since Streams client is already in
the terminal {} state", state);
return true;
- } else {
- log.warn("Streams client cannot transition to {} completely
within the timeout",
- state == State.PENDING_SHUTDOWN ? State.NOT_RUNNING :
State.ERROR);
- return false;
}
- }
- if (!setState(State.PENDING_SHUTDOWN)) {
- // if we can't transition to PENDING_SHUTDOWN but not because
we're already shutting down, then it must be fatal
- log.error("Failed to transition to PENDING_SHUTDOWN, current state
is {}", state);
- throw new StreamsException("Failed to shut down while in state " +
state);
- } else {
+ throw new IllegalStateException("If transitioning to
PENDING_SHUTDOWN fails, the state should be either in "
+ + "PENDING_SHUTDOWN, PENDING_ERROR, ERROR, or NOT_RUNNING");
+ }
- final Thread shutdownThread = shutdownHelper(false, timeoutMs,
leaveGroup);
+ final Thread shutdownThread = shutdownHelper(false, timeoutMs,
leaveGroup);
- shutdownThread.setDaemon(true);
- shutdownThread.start();
- }
+ shutdownThread.setDaemon(true);
+ shutdownThread.start();
- if (waitOnState(State.NOT_RUNNING, timeoutMs)) {
+ if (waitOnStates(timeoutMs, State.NOT_RUNNING)) {
log.info("Streams client stopped completely");
return true;
} else {