This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 70e4540b638 MINOR: Ignore unassigned records in MockConsumer (#21631)
70e4540b638 is described below

commit 70e4540b63801ee28f2ca12ce6133a86f0adeb43
Author: Nick Telford <[email protected]>
AuthorDate: Fri Mar 6 00:17:14 2026 +0000

    MINOR: Ignore unassigned records in MockConsumer (#21631)
    
    In `poll()`, `MockConsumer` iterates the added `records`, and only
    checks that they belong to an assigned partition quite late in the
    process.
    
    This can cause an error if records were added for a partition that was
    later removed from the assignment:
    
    ```java
    consumer.assign(Arrays.asList(new TopicPartition("t1", 0), new
    TopicPartition("t2", 0)));
    consumer.addRecord(new ConsumerRecord("t1", 0, 0, "a", 123));
    consumer.addRecord(new ConsumerRecord("t2", 0, 0, "b", 123));
    
    consumer.assign(Collections.singleton(new TopicPartition("t1", 0)));
    
    // throws IllegalStateException
    consumer.poll(Duration.seconds(1));
    ```
    
    Moving this check earlier in the process avoids this error in `poll()`,
    instead discarding records that do not belong to the assignment.
    
    This enables tests to proactively add all their records and then modify
    the assignment multiple times, if necessary, to test specific
    combinations of partitions.
    
    This is particularly useful in some of the `streams` tests, where we
    need to test with multiple state store changelogs, but the assignment
    gets changed internally by Kafka Streams (notably,
    `GlobalStateManagerImplTest`).
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java | 4 ++--
 .../apache/kafka/streams/processor/internals/StreamThreadTest.java    | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index 395f26338cd..f727eb68bea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -284,7 +284,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
         while (partitionsIter.hasNext() && numPollRecords < 
this.maxPollRecords) {
             Map.Entry<TopicPartition, List<ConsumerRecord<K, V>>> entry = 
partitionsIter.next();
 
-            if (!subscriptions.isPaused(entry.getKey())) {
+            if (!subscriptions.isPaused(entry.getKey()) && 
subscriptions.isAssigned(entry.getKey())) {
                 final Iterator<ConsumerRecord<K, V>> recIterator = 
entry.getValue().iterator();
                 while (recIterator.hasNext()) {
                     if (numPollRecords >= this.maxPollRecords) {
@@ -298,7 +298,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> {
                         throw new 
OffsetOutOfRangeException(Collections.singletonMap(entry.getKey(), position));
                     }
 
-                    if (assignment().contains(entry.getKey()) && rec.offset() 
>= position) {
+                    if (rec.offset() >= position) {
                         results.computeIfAbsent(entry.getKey(), partition -> 
new ArrayList<>()).add(rec);
                         Metadata.LeaderAndEpoch leaderAndEpoch = new 
Metadata.LeaderAndEpoch(Optional.empty(), rec.leaderEpoch());
                         SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 8db09dba11e..3d5c59f7f80 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -1456,7 +1456,7 @@ public class StreamThreadTest {
 
         assertThat(thrown.getCause(), isA(IllegalStateException.class));
         // The Mock consumer shall throw as the assignment has been wiped out, 
but records are assigned.
-        assertEquals("No current assignment for partition topic1-1", 
thrown.getCause().getMessage());
+        assertEquals("Cannot add records for a partition that is not assigned 
to the consumer", thrown.getCause().getMessage());
         assertFalse(consumer.shouldRebalance());
 
         verify(taskManager).handleLostAll();

Reply via email to