This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22d520adae1 KAFKA-19943: Implement state store clean up on start
(#21566)
22d520adae1 is described below
commit 22d520adae1211d6b0939acb06e3bf9bf50696be
Author: Uladzislau Blok <[email protected]>
AuthorDate: Mon Mar 9 19:12:47 2026 +0100
KAFKA-19943: Implement state store clean up on start (#21566)
Implementation for KIP-1259
Adds a new property to set max directory age, and remove directories if
`current time - max age > last modify time`.
Reviewers: Matthias J. Sax <[email protected]>
---
docs/streams/developer-guide/config-streams.md | 20 +++++++++--
docs/streams/upgrade-guide.md | 2 ++
.../org/apache/kafka/streams/KafkaStreams.java | 5 +++
.../org/apache/kafka/streams/StreamsConfig.java | 11 ++++++
.../processor/internals/StateDirectory.java | 40 ++++++++++++++++++++++
.../org/apache/kafka/streams/KafkaStreamsTest.java | 37 ++++++++++++++++++++
.../processor/internals/StateDirectoryTest.java | 36 +++++++++++++++++++
7 files changed, 149 insertions(+), 2 deletions(-)
diff --git a/docs/streams/developer-guide/config-streams.md
b/docs/streams/developer-guide/config-streams.md
index 57662eb58b9..265e58f9c20 100644
--- a/docs/streams/developer-guide/config-streams.md
+++ b/docs/streams/developer-guide/config-streams.md
@@ -1039,8 +1039,24 @@ The amount of time in milliseconds to wait before
deleting state when a partitio
</td>
<td>
-`600000`
-</td> (10 minutes)
+`600000` (10 minutes)
+</td> </tr>
+<tr>
+<td>
+
+state.cleanup.dir.max.age.ms
+</td>
+<td>
+
+Low
+</td>
+<td>
+
+Time-based threshold for purging local state directories and checkpoint files
during application startup. State directories that have not been modified for
at least `state.cleanup.dir.max.age.ms` will be removed.
+</td>
+<td>
+
+`-1` (Disabled)
</td> </tr>
<tr>
<td>
diff --git a/docs/streams/upgrade-guide.md b/docs/streams/upgrade-guide.md
index 072f72fd34a..afdf1e5c497 100644
--- a/docs/streams/upgrade-guide.md
+++ b/docs/streams/upgrade-guide.md
@@ -69,6 +69,8 @@ Kafka Streams now supports `ProcessingExceptionHandler` for
global store/KTable
The streams thread metrics `commit-ratio`, `process-ratio`, `punctuate-ratio`,
and `poll-ratio`, along with streams state updater metrics
`active-restore-ratio`, `standby-restore-ratio`, `idle-ratio`, and
`checkpoint-ratio` have been updated. Each metric now reports, over a rolling
measurement window, the ratio of time this thread spends performing the given
action (`{action}`) to the total elapsed time in that window. The effective
window duration is determined by the metrics configurat [...]
+Kafka Streams now allows to purge local state directories and checkpoint files
during application startup if they have not been modified for a certain period
of time. This can be configured via the new `state.cleanup.dir.max.age.ms`
config. More details can be found in
[KIP-1259](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1259%3A+Add+configuration+to+wipe+Kafka+Streams+local+state+on+startup)
+
### Deprecation of streams-scala module (KIP-1244)
The `kafka-streams-scala` module (`org.apache.kafka.streams.scala` package) is
deprecated in 4.3.0 and will be removed in 5.0.
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index b00623b32bb..7adb5ac73c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1388,6 +1388,11 @@ public class KafkaStreams implements AutoCloseable {
*/
public synchronized void start() throws IllegalStateException,
StreamsException {
if (setState(State.REBALANCING)) {
+ final Long dirMaxAgeMs =
applicationConfigs.getLong(StreamsConfig.STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG);
+ if (dirMaxAgeMs !=
StreamsConfig.STATE_CLEANUP_DIR_MAX_AGE_MS_DISABLED) {
+ log.debug("Start cleaning outdated directories");
+ stateDirectory.cleanOutdatedDirsOnStartup(dirMaxAgeMs);
+ }
log.debug("Initializing store offsets for existing local state");
stateDirectory.initializeStartupStores(topologyMetadata,
logContext, streamsMetrics);
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index f2f402034c9..7ad19d3562c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -173,6 +173,7 @@ public class StreamsConfig extends AbstractConfig {
public static final int DUMMY_THREAD_INDEX = 1;
public static final long MAX_TASK_IDLE_MS_DISABLED = -1;
+ public static final long STATE_CLEANUP_DIR_MAX_AGE_MS_DISABLED = -1;
// We impose these limitations because client tags are encoded into the
subscription info,
// which is part of the group metadata message that is persisted into the
internal topic.
@@ -767,6 +768,11 @@ public class StreamsConfig extends AbstractConfig {
public static final String STATE_CLEANUP_DELAY_MS_CONFIG =
"state.cleanup.delay.ms";
private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of
time in milliseconds to wait before deleting state when a partition has
migrated. Only state directories that have not been modified for at least
<code>state.cleanup.delay.ms</code> will be removed";
+ /** {@code state.cleanup.dir.max.age} */
+ @SuppressWarnings("WeakerAccess")
+ public static final String STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG =
"state.cleanup.dir.max.age.ms";
+ private static final String STATE_CLEANUP_DIR_MAX_AGE_MS_DOC = "Time-based
threshold for purging local state directories and checkpoint files during
application startup. State directories that have not been modified for at least
<code>" + STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG + "</code> will be removed.";
+
/** {@code state.dir} */
@SuppressWarnings("WeakerAccess")
public static final String STATE_DIR_CONFIG = "state.dir";
@@ -1266,6 +1272,11 @@ public class StreamsConfig extends AbstractConfig {
10 * 60 * 1000L,
Importance.LOW,
STATE_CLEANUP_DELAY_MS_DOC)
+ .define(STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG,
+ Type.LONG,
+ STATE_CLEANUP_DIR_MAX_AGE_MS_DISABLED,
+ Importance.LOW,
+ STATE_CLEANUP_DIR_MAX_AGE_MS_DOC)
.define(UPGRADE_FROM_CONFIG,
Type.STRING,
null,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 701ac8195d7..5b37c1bf427 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -646,6 +646,46 @@ public class StateDirectory implements AutoCloseable {
maybeCleanEmptyNamedTopologyDirs(true);
}
+ /**
+ * Purges local state directories and checkpoint files during application
startup.
+ *
+ * @param dirMaxAgeMs the time-based threshold in milliseconds. Only state
directories
+ * and checkpoint files that have not been modified for at least
+ * this amount of time (corresponding to the
+ * {@code state.cleanup.dir.max.age.ms} property) will be removed.
+ */
+ public synchronized void cleanOutdatedDirsOnStartup(final long
dirMaxAgeMs) {
+ try {
+ cleanStateAndTaskDirectoriesOnStartup(dirMaxAgeMs);
+ } catch (final Exception e) {
+ throw new StreamsException(e);
+ }
+ }
+
+ private void cleanStateAndTaskDirectoriesOnStartup(final long dirMaxAgeMs)
throws Exception {
+ final AtomicReference<Exception> firstException = new
AtomicReference<>();
+ for (final TaskDirectory taskDir : listAllTaskDirectories()) {
+ final String dirName = taskDir.file().getName();
+ try {
+ final long now = time.milliseconds();
+ final long lastModifiedMs = taskDir.file().lastModified();
+ if (now - dirMaxAgeMs > lastModifiedMs) {
+ log.info("Deleting outdated state directory {} as {}ms has
elapsed from last update (max directory age is {}ms).",
+ dirName, now - lastModifiedMs, dirMaxAgeMs);
+ Utils.delete(taskDir.file());
+ }
+ } catch (final IOException exception) {
+ log.error("Failed to delete task directory {} with
exception:", dirName, exception);
+ firstException.compareAndSet(null, exception);
+ }
+ }
+
+ final Exception exception = firstException.get();
+ if (exception != null) {
+ throw exception;
+ }
+ }
+
/**
* Cleans up any leftover named topology directories that are empty, if
any exist
* @param logExceptionAsWarn if true, an exception will be logged as a
warning
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 28ca83ddb2b..857b53910f2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -126,6 +126,7 @@ import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockConstruction;
import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -1877,6 +1878,42 @@ public class KafkaStreamsTest {
}
}
+ @Test
+ public void shouldCallCleanOnStartupOnlyWhenEnabled() {
+ props.put(StreamsConfig.STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG, 100);
+
+ prepareStreams();
+ prepareStreamThread(streamThreadOne, 1);
+ prepareStreamThread(streamThreadTwo, 2);
+
+ try (final MockedConstruction<StateDirectory> constructed =
mockConstruction(StateDirectory.class,
+ (mock, context) ->
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
+ try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ assertEquals(1, constructed.constructed().size());
+ final StateDirectory stateDirectory =
constructed.constructed().get(0);
+ streams.start();
+ verify(stateDirectory).cleanOutdatedDirsOnStartup(100);
+ }
+ }
+ }
+
+ @Test
+ public void shouldNotCallCleanOnStartupByDefault() {
+ prepareStreams();
+ prepareStreamThread(streamThreadOne, 1);
+ prepareStreamThread(streamThreadTwo, 2);
+
+ try (final MockedConstruction<StateDirectory> constructed =
mockConstruction(StateDirectory.class,
+ (mock, context) ->
when(mock.initializeProcessId()).thenReturn(UUID.randomUUID()))) {
+ try (final KafkaStreams streams = new
KafkaStreams(getBuilderWithSource().build(), props, supplier, time)) {
+ assertEquals(1, constructed.constructed().size());
+ final StateDirectory stateDirectory =
constructed.constructed().get(0);
+ streams.start();
+ verify(stateDirectory,
never()).cleanOutdatedDirsOnStartup(anyLong());
+ }
+ }
+ }
+
private Topology getStatefulTopology(final String inputTopic,
final String outputTopic,
final String globalTopicName,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index b89628d225b..6e2250ead51 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -910,6 +910,42 @@ public class StateDirectoryTest {
assertFalse(store.isOpen());
}
+ @Test
+ public void
shouldCleanupStateDirectoriesWhenLastModifiedIsLessThanNowMinusMaxDirAge() {
+ final TaskId task0 = new TaskId(0, 0);
+ final TaskId task1 = new TaskId(1, 0);
+ final TaskId task2 = new TaskId(2, 0);
+
+ final int dirMaxAgeMs = 60000;
+ final long outdatedModifiedTime = time.milliseconds() - dirMaxAgeMs -
1000;
+
+ assertTrue(new File(directory.getOrCreateDirectoryForTask(task0),
"store").mkdir());
+ assertTrue(new File(directory.getOrCreateDirectoryForTask(task1),
"store").mkdir());
+ assertTrue(new File(directory.getOrCreateDirectoryForTask(task2),
"store").mkdir());
+
+ final File dir0File = new File(appDir, toTaskDirString(task0));
+ dir0File.setLastModified(outdatedModifiedTime);
+ final File dir1File = new File(appDir, toTaskDirString(task1));
+ dir1File.setLastModified(outdatedModifiedTime);
+ final File dir2File = new File(appDir, toTaskDirString(task2));
+
+ final TaskDirectory dir0 = new TaskDirectory(dir0File, null);
+ final TaskDirectory dir1 = new TaskDirectory(dir1File, null);
+ final TaskDirectory dir2 = new TaskDirectory(dir2File, null);
+
+ List<TaskDirectory> files = directory.listAllTaskDirectories();
+ assertEquals(Set.of(dir0, dir1, dir2), new HashSet<>(files));
+ files = directory.listNonEmptyTaskDirectories();
+ assertEquals(Set.of(dir0, dir1, dir2), new HashSet<>(files));
+
+ directory.cleanOutdatedDirsOnStartup(dirMaxAgeMs);
+
+ files = directory.listAllTaskDirectories();
+ assertEquals(Set.of(dir2), new HashSet<>(files));
+ files = directory.listNonEmptyTaskDirectories();
+ assertEquals(Set.of(dir2), new HashSet<>(files));
+ }
+
private StateStore initializeStartupStores(final TaskId taskId, final
boolean createTaskDir) {
directory.initializeProcessId();
final TopologyMetadata metadata = Mockito.mock(TopologyMetadata.class);