This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 0eae7507d54 KAFKA-17635: Ensure only committed offsets are returned
for purging (#17686)
0eae7507d54 is described below
commit 0eae7507d547cfe432a346b50acb1d929480bd01
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Nov 6 17:45:27 2024 -0500
KAFKA-17635: Ensure only committed offsets are returned for purging (#17686)
Kafka Streams actively purges records from repartition topics. Prior to
this PR, Kafka Streams would retrieve the offset from the consumedOffsets map,
but here are a couple of edge cases where the consumedOffsets can get ahead of
the commitedOffsets map. In these cases, this means Kafka Streams will
potentially purge a repartition record before it's committed.
Updated the current StreamTask test to cover this case
Reviewers: Matthias Sax <[email protected]>
---
.../kafka/streams/processor/internals/StreamTask.java | 7 +++++--
.../streams/processor/internals/StreamTaskTest.java | 16 +++++++++++++---
2 files changed, 18 insertions(+), 5 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 6a4e97e4707..76df4693bf6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1074,10 +1074,13 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
@Override
public Map<TopicPartition, Long> purgeableOffsets() {
final Map<TopicPartition, Long> purgeableConsumedOffsets = new
HashMap<>();
- for (final Map.Entry<TopicPartition, Long> entry :
consumedOffsets.entrySet()) {
+ for (final Map.Entry<TopicPartition, Long> entry :
committedOffsets.entrySet()) {
final TopicPartition tp = entry.getKey();
if (topology.isRepartitionTopic(tp.topic())) {
- purgeableConsumedOffsets.put(tp, entry.getValue() + 1);
+ // committedOffsets map is initialized at -1 so no purging
until there's a committed offset
+ if (entry.getValue() > -1) {
+ purgeableConsumedOffsets.put(tp, entry.getValue());
+ }
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 3fa33ef8954..cecfdd35ca1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -78,6 +78,8 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
@@ -1863,8 +1865,9 @@ public class StreamTaskTest {
verify(stateManager).close();
}
- @Test
- public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void shouldMaybeReturnOffsetsForRepartitionTopicsForPurging(final
boolean doCommit) {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
final TopicPartition repartition = new TopicPartition("repartition",
1);
@@ -1916,10 +1919,17 @@ public class StreamTaskTest {
assertTrue(task.process(0L));
task.prepareCommit();
+ if (doCommit) {
+ task.updateCommittedOffsets(repartition, 10L);
+ }
final Map<TopicPartition, Long> map = task.purgeableOffsets();
- assertThat(map, equalTo(singletonMap(repartition, 11L)));
+ if (doCommit) {
+ assertThat(map, equalTo(singletonMap(repartition, 10L)));
+ } else {
+ assertThat(map, equalTo(Collections.emptyMap()));
+ }
}
@Test