This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 5e72e7dbb5b Backport fix from 3.9 (#17716)
5e72e7dbb5b is described below

commit 5e72e7dbb5badb9dcede2202499951e2d6b12f82
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Nov 12 16:01:27 2024 -0500

    Backport fix from 3.9 (#17716)
    
    This is a backport of #17686 merged to trunk and cherry-picked to 3.9. Need 
to do a standalone PR due to merge conflicts.
    Reviewers: Matthias Sax <[email protected]>
---
 .../streams/processor/internals/StreamTask.java    |  7 +++++--
 .../processor/internals/StreamTaskTest.java        | 22 +++++++++++++++++-----
 2 files changed, 22 insertions(+), 7 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 163e2c0997e..f4d0499c327 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1003,10 +1003,13 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     @Override
     public Map<TopicPartition, Long> purgeableOffsets() {
         final Map<TopicPartition, Long> purgeableConsumedOffsets = new 
HashMap<>();
-        for (final Map.Entry<TopicPartition, Long> entry : 
consumedOffsets.entrySet()) {
+        for (final Map.Entry<TopicPartition, Long> entry : 
committedOffsets.entrySet()) {
             final TopicPartition tp = entry.getKey();
             if (topology.isRepartitionTopic(tp.topic())) {
-                purgeableConsumedOffsets.put(tp, entry.getValue() + 1);
+                // committedOffsets map is initialized at -1 so no purging 
until there's a committed offset
+                if (entry.getValue() > -1) {
+                    purgeableConsumedOffsets.put(tp, entry.getValue());
+                }
             }
         }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 931f2c75897..44882a8a46a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1778,7 +1778,18 @@ public class StreamTaskTest {
     }
 
     @Test
-    public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
+    public void shouldReturnCommittedOffsetsForRepartitionTopicsForPurging() {
+        final Map<TopicPartition, Long> purgeableOffsets = 
doReturnPurgeableOffsets(true);
+        assertThat(purgeableOffsets, equalTo(singletonMap(new 
TopicPartition("repartition", 1), 10L)));
+    }
+
+    @Test
+    public void 
shouldReturnEmptyMapWhenNoCommitForRepartitionTopicsForPurging() {
+        final Map<TopicPartition, Long> purgeableOffsets = 
doReturnPurgeableOffsets(false);
+        assertThat(purgeableOffsets, equalTo(Collections.emptyMap()));
+    }
+
+    private Map<TopicPartition, Long> doReturnPurgeableOffsets(final boolean 
doCommit) {
         final TopicPartition repartition = new TopicPartition("repartition", 
1);
 
         final ProcessorTopology topology = withRepartitionTopics(
@@ -1813,7 +1824,7 @@ public class StreamTaskTest {
             context,
             logContext,
             false
-            );
+        );
 
         task.initializeIfNeeded();
         task.completeRestoration(noOpResetter -> { });
@@ -1828,10 +1839,11 @@ public class StreamTaskTest {
         assertTrue(task.process(0L));
 
         task.prepareCommit();
+        if (doCommit) {
+            task.updateCommittedOffsets(repartition, 10L);
+        }
 
-        final Map<TopicPartition, Long> map = task.purgeableOffsets();
-
-        assertThat(map, equalTo(singletonMap(repartition, 11L)));
+        return task.purgeableOffsets();
     }
 
     @Test

Reply via email to