This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 70e4540b638 MINOR: Ignore unassigned records in MockConsumer (#21631)
70e4540b638 is described below
commit 70e4540b63801ee28f2ca12ce6133a86f0adeb43
Author: Nick Telford <[email protected]>
AuthorDate: Fri Mar 6 00:17:14 2026 +0000
MINOR: Ignore unassigned records in MockConsumer (#21631)
In `poll()`, `MockConsumer` iterates the added `records`, and only
checks that they belong to an assigned partition quite late in the
process.
This can cause an error if records were added for a partition that was
later removed from the assignment:
```java
consumer.assign(Arrays.asList(new TopicPartition("t1", 0), new
TopicPartition("t2", 0)));
consumer.addRecord(new ConsumerRecord("t1", 0, 0, "a", 123));
consumer.addRecord(new ConsumerRecord("t2", 0, 0, "b", 123));
consumer.assign(Collections.singleton(new TopicPartition("t1", 0)));
// throws IllegalStateException
consumer.poll(Duration.seconds(1));
```
Moving this check earlier in the process avoids this error in `poll()`,
instead discarding records that do not belong to the assignment.
This enables tests to proactively add all their records and then modify
the assignment multiple times, if necessary, to test specific
combinations of partitions.
This is particularly useful in some of the `streams` tests, where we
need to test with multiple state store changelogs, but the assignment
gets changed internally by Kafka Streams (notably,
`GlobalStateManagerImplTest`).
Reviewers: Bill Bejeck <[email protected]>
---
.../src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java | 4 ++--
.../apache/kafka/streams/processor/internals/StreamThreadTest.java | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 395f26338cd..f727eb68bea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -284,7 +284,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
while (partitionsIter.hasNext() && numPollRecords <
this.maxPollRecords) {
Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry =
partitionsIter.next();
- if (!subscriptions.isPaused(entry.getKey())) {
+ if (!subscriptions.isPaused(entry.getKey()) &&
subscriptions.isAssigned(entry.getKey())) {
final Iterator<ConsumerRecord<K, V>> recIterator =
entry.getValue().iterator();
while (recIterator.hasNext()) {
if (numPollRecords >= this.maxPollRecords) {
@@ -298,7 +298,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
throw new
OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), position));
}
- if (assignment().contains(entry.getKey()) && rec.offset()
>= position) {
+ if (rec.offset() >= position) {
results.computeIfAbsent(entry.getKey(), partition ->
new ArrayList<>()).add(rec);
Metadata.LeaderAndEpoch leaderAndEpoch = new
Metadata.LeaderAndEpoch(Optional.empty(), rec.leaderEpoch());
SubscriptionState.FetchPosition newPosition = new
SubscriptionState.FetchPosition(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8db09dba11e..3d5c59f7f80 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1456,7 +1456,7 @@ public class StreamThreadTest {
assertThat(thrown.getCause(), isA(IllegalStateException.class));
// The Mock consumer shall throw as the assignment has been wiped out,
but records are assigned.
- assertEquals("No current assignment for partition topic1-1",
thrown.getCause().getMessage());
+ assertEquals("Cannot add records for a partition that is not assigned
to the consumer", thrown.getCause().getMessage());
assertFalse(consumer.shouldRebalance());
verify(taskManager).handleLostAll();