This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 0eae7507d54 KAFKA-17635: Ensure only committed offsets are returned 
for purging (#17686)
0eae7507d54 is described below

commit 0eae7507d547cfe432a346b50acb1d929480bd01
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Nov 6 17:45:27 2024 -0500

    KAFKA-17635: Ensure only committed offsets are returned for purging (#17686)
    
    Kafka Streams actively purges records from repartition topics. Prior to 
this PR, Kafka Streams would retrieve the offset from the consumedOffsets map, 
but here are a couple of edge cases where the consumedOffsets can get ahead of 
the commitedOffsets map. In these cases, this means Kafka Streams will 
potentially purge a repartition record before it's committed.
    
    Updated the current StreamTask test to cover this case
    
    Reviewers: Matthias Sax <[email protected]>
---
 .../kafka/streams/processor/internals/StreamTask.java    |  7 +++++--
 .../streams/processor/internals/StreamTaskTest.java      | 16 +++++++++++++---
 2 files changed, 18 insertions(+), 5 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6a4e97e4707..76df4693bf6 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1074,10 +1074,13 @@ public class StreamTask extends AbstractTask implements 
ProcessorNodePunctuator,
     @Override
     public Map<TopicPartition, Long> purgeableOffsets() {
         final Map<TopicPartition, Long> purgeableConsumedOffsets = new 
HashMap<>();
-        for (final Map.Entry<TopicPartition, Long> entry : 
consumedOffsets.entrySet()) {
+        for (final Map.Entry<TopicPartition, Long> entry : 
committedOffsets.entrySet()) {
             final TopicPartition tp = entry.getKey();
             if (topology.isRepartitionTopic(tp.topic())) {
-                purgeableConsumedOffsets.put(tp, entry.getValue() + 1);
+                // committedOffsets map is initialized at -1 so no purging 
until there's a committed offset
+                if (entry.getValue() > -1) {
+                    purgeableConsumedOffsets.put(tp, entry.getValue());
+                }
             }
         }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3fa33ef8954..cecfdd35ca1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -78,6 +78,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.InOrder;
 import org.mockito.Mock;
 import org.mockito.junit.jupiter.MockitoExtension;
@@ -1863,8 +1865,9 @@ public class StreamTaskTest {
         verify(stateManager).close();
     }
 
-    @Test
-    public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void shouldMaybeReturnOffsetsForRepartitionTopicsForPurging(final 
boolean doCommit) {
         when(stateManager.taskId()).thenReturn(taskId);
         when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
         final TopicPartition repartition = new TopicPartition("repartition", 
1);
@@ -1916,10 +1919,17 @@ public class StreamTaskTest {
         assertTrue(task.process(0L));
 
         task.prepareCommit();
+        if (doCommit) {
+            task.updateCommittedOffsets(repartition, 10L);
+        }
 
         final Map<TopicPartition, Long> map = task.purgeableOffsets();
 
-        assertThat(map, equalTo(singletonMap(repartition, 11L)));
+        if (doCommit) {
+            assertThat(map, equalTo(singletonMap(repartition, 10L)));
+        } else {
+            assertThat(map, equalTo(Collections.emptyMap()));
+        }
     }
 
     @Test

Reply via email to