This is an automated email from the ASF dual-hosted git repository.
AndrewJSchofield pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 27689488232 KAFKA-20575: Add onPartitionsLost support for MockConsume
(#22273)
27689488232 is described below
commit 2768948823287b81fba91d7c071caf2d4839fe4f
Author: Aditya Kousik <[email protected]>
AuthorDate: Fri May 15 02:40:18 2026 -0700
KAFKA-20575: Add onPartitionsLost support for MockConsume (#22273)
MockConsumer.rebalance() only supports graceful path with
onPartitionsRevoked. There is no way to test partitions lost due to
session timeout/failed heartbeat etc
Reviewers: Andrew Schofield <[email protected]>
---
.../kafka/clients/consumer/MockConsumer.java | 28 ++++++
.../kafka/clients/consumer/MockConsumerTest.java | 105 +++++++++++++++++++++
2 files changed, 133 insertions(+)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 9702cd24220..2a3e0cdc53d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -142,6 +142,34 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
this.subscriptions.rebalanceListener().ifPresent(crl ->
crl.onPartitionsAssigned(added));
}
+ /**
+ * Simulates a partition loss event. Calls {@link
ConsumerRebalanceListener#onPartitionsLost}
+ * for the specified partitions and removes them from the current
assignment. Unlike
+ * {@link #rebalance(Collection)}, which calls {@link
ConsumerRebalanceListener#onPartitionsRevoked},
+ * this method models the case where the consumer loses partitions without
a graceful revoke..
+ *
+ * <p>Only records belonging to the lost partitions are cleared; records
for retained
+ * partitions are unaffected.
+ *
+ * @param partitionsLost the partitions to lose; all must be currently
assigned
+ * @throws IllegalStateException if any partition is not currently assigned
+ */
+ public synchronized void losePartitions(Collection<TopicPartition>
partitionsLost) {
+ Set<TopicPartition> currentAssignment =
this.subscriptions.assignedPartitions();
+ Set<TopicPartition> lost = new HashSet<>(partitionsLost);
+ List<TopicPartition> notAssigned = lost.stream()
+ .filter(tp -> !currentAssignment.contains(tp))
+ .collect(Collectors.toList());
+ if (!notAssigned.isEmpty())
+ throw new IllegalStateException("Cannot lose partitions that are
not currently assigned: " + notAssigned);
+ lost.forEach(records::remove);
+ this.subscriptions.rebalanceListener().ifPresent(crl ->
crl.onPartitionsLost(lost));
+ Set<TopicPartition> remaining = currentAssignment.stream()
+ .filter(tp -> !lost.contains(tp))
+ .collect(Collectors.toSet());
+ this.subscriptions.assignFromSubscribed(remaining);
+ }
+
@Override
public synchronized Set<String> subscription() {
return this.subscriptions.subscription();
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
index 6968b45a57b..d3fa859cf9f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java
@@ -32,6 +32,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.IntStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -230,4 +231,108 @@ public class MockConsumerTest {
assertTrue(records.isEmpty());
}
+ @Test
+ public void testLosePartitionsCallsOnPartitionsLost() {
+ TopicPartition tp0 = new TopicPartition("test", 0);
+ TopicPartition tp1 = new TopicPartition("test", 1);
+ List<TopicPartition> assigned = List.of(tp0, tp1);
+
+ List<TopicPartition> lost = new ArrayList<>();
+ List<TopicPartition> revoked = new ArrayList<>();
+ consumer.subscribe(List.of("test"), new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
+ revoked.addAll(partitions);
+ }
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {}
+ @Override
+ public void onPartitionsLost(Collection<TopicPartition>
partitions) {
+ lost.addAll(partitions);
+ }
+ });
+
+ consumer.rebalance(assigned);
+ consumer.losePartitions(List.of(tp0));
+
+ assertEquals(List.of(tp0), lost);
+ assertTrue(revoked.isEmpty());
+ }
+
+ @Test
+ public void testLosePartitionsRemovesFromAssignment() {
+ TopicPartition tp0 = new TopicPartition("test", 0);
+ TopicPartition tp1 = new TopicPartition("test", 1);
+
+ consumer.subscribe(List.of("test"));
+ consumer.rebalance(List.of(tp0, tp1));
+ consumer.losePartitions(List.of(tp0));
+
+ assertFalse(consumer.assignment().contains(tp0));
+ assertTrue(consumer.assignment().contains(tp1));
+ }
+
+ @Test
+ public void testLosePartitionsThrowsIfNotAssigned() {
+ TopicPartition tp0 = new TopicPartition("test", 0);
+ TopicPartition tp1 = new TopicPartition("test", 1);
+
+ consumer.subscribe(List.of("test"));
+ consumer.rebalance(List.of(tp0));
+
+ assertThrows(IllegalStateException.class,
+ () -> consumer.losePartitions(List.of(tp1)));
+ }
+
+ @Test
+ public void testLosePartitionsClearsOnlyLostRecords() {
+ TopicPartition tp0 = new TopicPartition("test", 0);
+ TopicPartition tp1 = new TopicPartition("test", 1);
+
+ consumer.subscribe(List.of("test"));
+ consumer.rebalance(List.of(tp0, tp1));
+ consumer.updateBeginningOffsets(new HashMap<>() {{
+ put(tp0, 0L);
+ put(tp1, 0L);
+ }});
+ consumer.seek(tp0, 0);
+ consumer.seek(tp1, 0);
+
+ consumer.addRecord(new ConsumerRecord<>("test", 0, 0, null, null));
+ consumer.addRecord(new ConsumerRecord<>("test", 1, 0, null, null));
+
+ consumer.losePartitions(List.of(tp0));
+
+ ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1));
+ assertEquals(1, records.count());
+
+ var record = records.iterator().next();
+ assertEquals(tp1, new TopicPartition(record.topic(),
record.partition()));
+ }
+
+ @Test
+ public void testLosePartitionsThenRebalance() {
+ TopicPartition tp0 = new TopicPartition("test", 0);
+ TopicPartition tp1 = new TopicPartition("test", 1);
+ TopicPartition tp2 = new TopicPartition("test", 2);
+
+ List<TopicPartition> assigned = new ArrayList<>();
+ consumer.subscribe(List.of("test"), new ConsumerRebalanceListener() {
+ @Override
+ public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {}
+ @Override
+ public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
+ assigned.addAll(partitions);
+ }
+ });
+
+ consumer.rebalance(List.of(tp0, tp1));
+ assigned.clear();
+
+ consumer.losePartitions(List.of(tp0));
+ consumer.rebalance(Arrays.asList(tp1, tp2));
+
+ assertEquals(List.of(tp2), assigned);
+ assertEquals(Set.of(tp1, tp2), consumer.assignment());
+ }
}