This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1ebca7817b5 KAFKA-19539: Kafka Streams should also purge internal 
topics based on user commit requests (#20234)
1ebca7817b5 is described below

commit 1ebca7817b537b6503941276a1d654ff5a5ae615
Author: Lan Ding <[email protected]>
AuthorDate: Mon Sep 29 23:26:49 2025 +0800

    KAFKA-19539: Kafka Streams should also purge internal topics based on user 
commit requests (#20234)
    
    Repartition topic records should be purged up to the currently committed
    offset once `repartition.purge.interval.ms` duration has passed.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../streams/processor/internals/StreamThread.java  | 12 ++++----
 .../processor/internals/StreamThreadTest.java      | 33 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 6 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 649a1ec666c..91511da5ee0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1837,12 +1837,6 @@ public class StreamThread extends Thread implements 
ProcessingThread {
                     .collect(Collectors.toSet())
             );
 
-            if ((now - lastPurgeMs) > purgeTimeMs) {
-                // try to purge the committed records for repartition topics 
if possible
-                taskManager.maybePurgeCommittedRecords();
-                lastPurgeMs = now;
-            }
-
             if (committed == -1) {
                 log.debug("Unable to commit as we are in the middle of a 
rebalance, will try again when it completes.");
             } else {
@@ -1853,6 +1847,12 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             committed = taskManager.maybeCommitActiveTasksPerUserRequested();
         }
 
+        if ((now - lastPurgeMs) > purgeTimeMs) {
+            // try to purge the committed records for repartition topics if 
possible
+            taskManager.maybePurgeCommittedRecords();
+            lastPurgeMs = now;
+        }
+
         return committed;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8fcc44993a5..e53474db74a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -624,6 +624,39 @@ public class StreamThreadTest {
         verify(taskManager).maybePurgeCommittedRecords();
     }
 
+    @ParameterizedTest
+    @MethodSource("data")
+    public void shouldAlsoPurgeBeforeTheCommitInterval(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {
+        final long purgeInterval = 1000L;
+        final long commitInterval = Long.MAX_VALUE;
+        final Properties props = configProps(false, stateUpdaterEnabled, 
processingThreadsEnabled);
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
Long.toString(commitInterval));
+        props.setProperty(StreamsConfig.REPARTITION_PURGE_INTERVAL_MS_CONFIG, 
Long.toString(purgeInterval));
+
+        final StreamsConfig config = new StreamsConfig(props);
+        @SuppressWarnings("unchecked")
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+        when(consumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        final TaskManager taskManager = mock(TaskManager.class);
+
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        thread = buildStreamThread(consumer, taskManager, config, 
topologyMetadata);
+
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
+
+        mockTime.sleep(purgeInterval + 1);
+
+        thread.setNow(mockTime.milliseconds());
+        thread.maybeCommit();
+
+        verify(taskManager, times(2)).maybePurgeCommittedRecords();
+    }
+
     @ParameterizedTest
     @MethodSource("data")        
     public void shouldNotProcessWhenPartitionRevoked(final boolean 
stateUpdaterEnabled, final boolean processingThreadsEnabled) {

Reply via email to