This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5725a514538 KAFKA-16460: New consumer times out consuming records in 
multiple consumer_test.py system tests (#17777)
5725a514538 is described below

commit 5725a51453859163c01735052b0c15ee6a513b8e
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Nov 16 02:41:39 2024 +0800

    KAFKA-16460: New consumer times out consuming records in multiple 
consumer_test.py system tests (#17777)
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../consumer/internals/AbstractMembershipManager.java        |  4 ++++
 .../consumer/internals/ConsumerMembershipManagerTest.java    | 12 ++++++++++--
 .../consumer/internals/ShareMembershipManagerTest.java       |  3 ++-
 3 files changed, 16 insertions(+), 3 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 82b4e567d34..c6aa70d805e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -851,6 +851,10 @@ public abstract class AbstractMembershipManager<R extends 
AbstractResponse> impl
                 revokedPartitions
         );
 
+        // Mark partitions as pending revocation to stop fetching from the 
partitions (no new
+        // fetches sent out, and no in-flight fetches responses processed).
+        markPendingRevocationToPauseFetching(revokedPartitions);
+
         // Commit offsets if auto-commit enabled before reconciling a new 
assignment. Request will
         // be retried until it succeeds, fails with non-retriable error, or 
timer expires.
         CompletableFuture<Void> commitResult;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 7faf4cc55c3..9d09aee2697 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -45,6 +45,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.InOrder;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -86,6 +87,7 @@ import static org.mockito.ArgumentMatchers.anySet;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -1433,6 +1435,7 @@ public class ConsumerMembershipManagerTest {
         membershipManager.poll(time.milliseconds());
 
         testRevocationOfAllPartitionsCompleted(membershipManager);
+        verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new 
TopicPartition("topic1", 0)));
     }
 
     @Test
@@ -1456,6 +1459,10 @@ public class ConsumerMembershipManagerTest {
 
         // Complete commit request
         commitResult.complete(null);
+        InOrder inOrder = inOrder(subscriptionState, commitRequestManager);
+        inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new 
TopicPartition("topic1", 0)));
+        
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRevocation(anyLong());
+        inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new 
TopicPartition("topic1", 0)));
 
         testRevocationOfAllPartitionsCompleted(membershipManager);
     }
@@ -1480,6 +1487,7 @@ public class ConsumerMembershipManagerTest {
         // Complete commit request
         commitResult.completeExceptionally(new KafkaException("Commit request 
failed with " +
                 "non-retriable error"));
+        verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new 
TopicPartition("topic1", 0)));
 
         testRevocationOfAllPartitionsCompleted(membershipManager);
     }
@@ -1579,11 +1587,11 @@ public class ConsumerMembershipManagerTest {
         mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, 
topicName, Collections.emptyList());
 
         // Member received assignment to reconcile;
-
         receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.poll(time.milliseconds());
+        verify(subscriptionState).markPendingRevocation(Set.of());
 
         // Member should complete reconciliation
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1607,6 +1615,7 @@ public class ConsumerMembershipManagerTest {
         receiveAssignment(topicId, Collections.singletonList(1), 
membershipManager);
 
         membershipManager.poll(time.milliseconds());
+        verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new 
TopicPartition(topicName, 0)));
 
         // Revocation should complete without requesting any metadata update 
given that the topic
         // received in target assignment should exist in local topic name 
cache.
@@ -2551,7 +2560,6 @@ public class ConsumerMembershipManagerTest {
         assertEquals(assignmentByTopicId, 
membershipManager.currentAssignment().partitions);
         assertFalse(membershipManager.reconciliationInProgress());
 
-        verify(subscriptionState).markPendingRevocation(anySet());
         List<TopicPartition> expectedTopicPartitionAssignment =
                 buildTopicPartitions(expectedCurrentAssignment);
         HashSet<TopicPartition> expectedSet = new 
HashSet<>(expectedTopicPartitionAssignment);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
index 36e48250714..7c4c5684bcc 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
@@ -1100,6 +1100,7 @@ public class ShareMembershipManagerTest {
 
         verifyReconciliationNotTriggered(membershipManager);
         membershipManager.poll(time.milliseconds());
+        verify(subscriptionState).markPendingRevocation(Set.of());
 
         // Member should complete reconciliation
         assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1123,6 +1124,7 @@ public class ShareMembershipManagerTest {
         receiveAssignment(topicId, Collections.singletonList(1), 
membershipManager);
 
         membershipManager.poll(time.milliseconds());
+        verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new 
TopicPartition(topicName, 0)));
 
         // Revocation should complete without requesting any metadata update 
given that the topic
         // received in target assignment should exist in local topic name 
cache.
@@ -1423,7 +1425,6 @@ public class ShareMembershipManagerTest {
         assertEquals(assignmentByTopicId, 
membershipManager.currentAssignment().partitions);
         assertFalse(membershipManager.reconciliationInProgress());
 
-        verify(subscriptionState).markPendingRevocation(anySet());
         List<TopicPartition> expectedTopicPartitionAssignment =
                 buildTopicPartitions(expectedCurrentAssignment);
         HashSet<TopicPartition> expectedSet = new 
HashSet<>(expectedTopicPartitionAssignment);

Reply via email to