This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3d1a16a28f8 KAFKA-20663: Fix startup state manager close to release
StateDirectory task lock (#22490)
3d1a16a28f8 is described below
commit 3d1a16a28f8f0036f7404289d99cf5e33447e90d
Author: Nick Telford <[email protected]>
AuthorDate: Sat Jun 6 18:49:59 2026 +0100
KAFKA-20663: Fix startup state manager close to release StateDirectory task
lock (#22490)
## Summary
`initializeStartupStores()` acquires a `StateDirectory` task lock via
`StateManagerUtil.registerStateStores()` but then closes the temporary
state manager by calling `temporaryStateManager.close()` directly —
which is `ProcessorStateManager.close()`. That method closes the RocksDB
stores but has no knowledge of the `StateDirectory` lock and never calls
`stateDirectory.unlock()`. Only `StateManagerUtil.closeStateManager()`
calls `stateDirectory.unlock()`, and it was never invoked for the
startup path.
As a result the main thread holds the `StateDirectory` task lock for
every startup task permanently. At shutdown, when the stream thread
calls `closeStateManager()` for those tasks, `stateDirectory.lock()`
returns `false` (main thread owns the lock, stream thread is the
caller), so `db.close()` is never called. With WAL disabled the RocksDB
memtables — including the 30-second `maybeCheckpoint()` offset writes —
are lost when the JVM exits. The SST files retain only the offset from
the last data-CF auto-flush, which for an inactive segment can be many
hours old, causing `OffsetOutOfRangeException` on the next restart when
the stale offset falls outside the changelog retention window.
The secondary symptom is the logged error `"Some task directories still
locked while closing state, this indicates unclean shutdown"`:
`StateDirectory.close()` is called from the shutdown helper thread, not
the main thread, so `unlockStartupStores()` cannot release
main-thread-owned locks (`unlock()` requires the calling thread to match
the lock owner).
The fix replaces `temporaryStateManager.close()` with
`StateManagerUtil.closeStateManager()`, which is the correct counterpart
to `registerStateStores()` — every other call site in the codebase
already pairs them. `closeStateManager()` calls
`ProcessorStateManager.close()` (preserving existing behaviour including
`maybeDowngradeOffsets()`) and then releases the `StateDirectory` lock
in a nested `finally` block.
Reviewers: Bill Bejeck <[email protected]>, Eduwer Camacaro
<[email protected]>, Matthias Sax <[email protected]>
---
.../kafka/streams/processor/internals/StateDirectory.java | 8 +++++++-
.../kafka/streams/processor/internals/StateDirectoryTest.java | 11 ++++++++++-
2 files changed, 17 insertions(+), 2 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 2eab9736b4a..bf57e26e5f1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.common.utils.internals.LogContext;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
@@ -230,6 +231,7 @@ public class StateDirectory implements AutoCloseable {
final List<TaskDirectory> nonEmptyTaskDirectories =
listNonEmptyTaskDirectories();
if (hasPersistentStores && !nonEmptyTaskDirectories.isEmpty()) {
final boolean eosEnabled = StreamsConfigUtils.eosEnabled(config);
+ final boolean transactionalStateStoresEnabled = new
TopologyConfig(config).transactionalStateStoresEnabled;
// Initialize thread-specific resources needed to open stores in
the state directory
final String threadLogPrefix = String.format("[%s]",
Thread.currentThread().getName());
@@ -271,7 +273,11 @@ public class StateDirectory implements AutoCloseable {
} finally {
// Make sure the state manager writes the local
checkpoint file before closing the stores
// This will be replaced in the future when removing
the checkpoint file dependency.
- temporaryStateManager.close();
+ StateManagerUtil.closeStateManager(
+ log, threadLogPrefix, true, eosEnabled,
+ transactionalStateStoresEnabled,
+ temporaryStateManager, this, Task.TaskType.ACTIVE
+ );
}
tasksInLocalState.add(task);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index 0f3e6846d24..e2ef1acd0d5 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -877,12 +877,21 @@ public class StateDirectoryTest {
assertFalse(directory.removeStartupState(taskId));
}
+ @Test
+ public void shouldNotHoldLockAfterInitializeStartupStores() {
+ final TaskId taskId = new TaskId(0, 0);
+ initializeStartupStores(taskId, true);
+
+ assertTrue(directory.hasStartupTasks());
+ assertNull(directory.lockOwner(taskId));
+ }
+
@Test
public void shouldUnlockStartupStateOnClose() {
final TaskId taskId = new TaskId(0, 0);
initializeStartupStores(taskId, true);
- assertEquals(Thread.currentThread(), directory.lockOwner(taskId));
+ assertNull(directory.lockOwner(taskId));
directory.close();
assertNull(directory.lockOwner(taskId));
}