This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1ebca7817b5 KAFKA-19539: Kafka Streams should also purge internal
topics based on user commit requests (#20234)
1ebca7817b5 is described below
commit 1ebca7817b537b6503941276a1d654ff5a5ae615
Author: Lan Ding <[email protected]>
AuthorDate: Mon Sep 29 23:26:49 2025 +0800
KAFKA-19539: Kafka Streams should also purge internal topics based on user
commit requests (#20234)
Repartition topic records should be purged up to the currently committed
offset once `repartition.purge.interval.ms` duration has passed.
Reviewers: Matthias J. Sax <[email protected]>
---
.../streams/processor/internals/StreamThread.java | 12 ++++----
.../processor/internals/StreamThreadTest.java | 33 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 6 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 649a1ec666c..91511da5ee0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1837,12 +1837,6 @@ public class StreamThread extends Thread implements
ProcessingThread {
.collect(Collectors.toSet())
);
- if ((now - lastPurgeMs) > purgeTimeMs) {
- // try to purge the committed records for repartition topics
if possible
- taskManager.maybePurgeCommittedRecords();
- lastPurgeMs = now;
- }
-
if (committed == -1) {
log.debug("Unable to commit as we are in the middle of a
rebalance, will try again when it completes.");
} else {
@@ -1853,6 +1847,12 @@ public class StreamThread extends Thread implements
ProcessingThread {
committed = taskManager.maybeCommitActiveTasksPerUserRequested();
}
+ if ((now - lastPurgeMs) > purgeTimeMs) {
+ // try to purge the committed records for repartition topics if
possible
+ taskManager.maybePurgeCommittedRecords();
+ lastPurgeMs = now;
+ }
+
return committed;
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8fcc44993a5..e53474db74a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -624,6 +624,39 @@ public class StreamThreadTest {
verify(taskManager).maybePurgeCommittedRecords();
}
+ @ParameterizedTest
+ @MethodSource("data")
+ public void shouldAlsoPurgeBeforeTheCommitInterval(final boolean
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
+ final long purgeInterval = 1000L;
+ final long commitInterval = Long.MAX_VALUE;
+ final Properties props = configProps(false, stateUpdaterEnabled,
processingThreadsEnabled);
+ props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+ props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
Long.toString(commitInterval));
+ props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG,
Long.toString(purgeInterval));
+
+ final StreamsConfig config = new StreamsConfig(props);
+ @SuppressWarnings("unchecked")
+ final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+ final ConsumerGroupMetadata consumerGroupMetadata =
mock(ConsumerGroupMetadata.class);
+ when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ final TaskManager taskManager = mock(TaskManager.class);
+
+ final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
+ topologyMetadata.buildAndRewriteTopology();
+ thread = buildStreamThread(consumer, taskManager, config,
topologyMetadata);
+
+ thread.setNow(mockTime.milliseconds());
+ thread.maybeCommit();
+
+ mockTime.sleep(purgeInterval + 1);
+
+ thread.setNow(mockTime.milliseconds());
+ thread.maybeCommit();
+
+ verify(taskManager, times(2)).maybePurgeCommittedRecords();
+ }
+
@ParameterizedTest
@MethodSource("data")
public void shouldNotProcessWhenPartitionRevoked(final boolean
stateUpdaterEnabled, final boolean processingThreadsEnabled) {