This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new 9b29f289a89 Backport fix from 3.9 (#17716)
9b29f289a89 is described below
commit 9b29f289a891d9cc953b3fcb40e2a7263c80383a
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Nov 12 16:01:27 2024 -0500
Backport fix from 3.9 (#17716)
This is a backport of #17686 merged to trunk and cherry-picked to 3.9. Need
to do a standalone PR due to merge conflicts.
Reviewers: Matthias Sax <[email protected]>
---
.../streams/processor/internals/StreamTask.java | 7 +++++--
.../processor/internals/StreamTaskTest.java | 22 +++++++++++++++++-----
2 files changed, 22 insertions(+), 7 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 163e2c0997e..f4d0499c327 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -1003,10 +1003,13 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
@Override
public Map<TopicPartition, Long> purgeableOffsets() {
final Map<TopicPartition, Long> purgeableConsumedOffsets = new
HashMap<>();
- for (final Map.Entry<TopicPartition, Long> entry :
consumedOffsets.entrySet()) {
+ for (final Map.Entry<TopicPartition, Long> entry :
committedOffsets.entrySet()) {
final TopicPartition tp = entry.getKey();
if (topology.isRepartitionTopic(tp.topic())) {
- purgeableConsumedOffsets.put(tp, entry.getValue() + 1);
+ // committedOffsets map is initialized at -1 so no purging
until there's a committed offset
+ if (entry.getValue() > -1) {
+ purgeableConsumedOffsets.put(tp, entry.getValue());
+ }
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 31665b2fe82..3cc4be8c73c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -1836,7 +1836,18 @@ public class StreamTaskTest {
}
@Test
- public void shouldReturnOffsetsForRepartitionTopicsForPurging() {
+ public void shouldReturnCommittedOffsetsForRepartitionTopicsForPurging() {
+ final Map<TopicPartition, Long> purgeableOffsets =
doReturnPurgeableOffsets(true);
+ assertThat(purgeableOffsets, equalTo(singletonMap(new
TopicPartition("repartition", 1), 10L)));
+ }
+
+ @Test
+ public void
shouldReturnEmptyMapWhenNoCommitForRepartitionTopicsForPurging() {
+ final Map<TopicPartition, Long> purgeableOffsets =
doReturnPurgeableOffsets(false);
+ assertThat(purgeableOffsets, equalTo(Collections.emptyMap()));
+ }
+
+ private Map<TopicPartition, Long> doReturnPurgeableOffsets(final boolean
doCommit) {
final TopicPartition repartition = new TopicPartition("repartition",
1);
final ProcessorTopology topology = withRepartitionTopics(
@@ -1876,7 +1887,7 @@ public class StreamTaskTest {
context,
logContext,
false
- );
+ );
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
@@ -1891,10 +1902,11 @@ public class StreamTaskTest {
assertTrue(task.process(0L));
task.prepareCommit();
+ if (doCommit) {
+ task.updateCommittedOffsets(repartition, 10L);
+ }
- final Map<TopicPartition, Long> map = task.purgeableOffsets();
-
- assertThat(map, equalTo(singletonMap(repartition, 11L)));
+ return task.purgeableOffsets();
}
@Test