This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5725a514538 KAFKA-16460: New consumer times out consuming records in
multiple consumer_test.py system tests (#17777)
5725a514538 is described below
commit 5725a51453859163c01735052b0c15ee6a513b8e
Author: PoAn Yang <[email protected]>
AuthorDate: Sat Nov 16 02:41:39 2024 +0800
KAFKA-16460: New consumer times out consuming records in multiple
consumer_test.py system tests (#17777)
Reviewers: Lianet Magrans <[email protected]>
---
.../consumer/internals/AbstractMembershipManager.java | 4 ++++
.../consumer/internals/ConsumerMembershipManagerTest.java | 12 ++++++++++--
.../consumer/internals/ShareMembershipManagerTest.java | 3 ++-
3 files changed, 16 insertions(+), 3 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
index 82b4e567d34..c6aa70d805e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java
@@ -851,6 +851,10 @@ public abstract class AbstractMembershipManager<R extends
AbstractResponse> impl
revokedPartitions
);
+ // Mark partitions as pending revocation to stop fetching from the
partitions (no new
+ // fetches sent out, and no in-flight fetches responses processed).
+ markPendingRevocationToPauseFetching(revokedPartitions);
+
// Commit offsets if auto-commit enabled before reconciling a new
assignment. Request will
// be retried until it succeeds, fails with non-retriable error, or
timer expires.
CompletableFuture<Void> commitResult;
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
index 7faf4cc55c3..9d09aee2697 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java
@@ -45,6 +45,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
+import org.mockito.InOrder;
import java.util.ArrayList;
import java.util.Arrays;
@@ -86,6 +87,7 @@ import static org.mockito.ArgumentMatchers.anySet;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -1433,6 +1435,7 @@ public class ConsumerMembershipManagerTest {
membershipManager.poll(time.milliseconds());
testRevocationOfAllPartitionsCompleted(membershipManager);
+ verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new
TopicPartition("topic1", 0)));
}
@Test
@@ -1456,6 +1459,10 @@ public class ConsumerMembershipManagerTest {
// Complete commit request
commitResult.complete(null);
+ InOrder inOrder = inOrder(subscriptionState, commitRequestManager);
+ inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new
TopicPartition("topic1", 0)));
+
inOrder.verify(commitRequestManager).maybeAutoCommitSyncBeforeRevocation(anyLong());
+ inOrder.verify(subscriptionState).markPendingRevocation(Set.of(new
TopicPartition("topic1", 0)));
testRevocationOfAllPartitionsCompleted(membershipManager);
}
@@ -1480,6 +1487,7 @@ public class ConsumerMembershipManagerTest {
// Complete commit request
commitResult.completeExceptionally(new KafkaException("Commit request
failed with " +
"non-retriable error"));
+ verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new
TopicPartition("topic1", 0)));
testRevocationOfAllPartitionsCompleted(membershipManager);
}
@@ -1579,11 +1587,11 @@ public class ConsumerMembershipManagerTest {
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId,
topicName, Collections.emptyList());
// Member received assignment to reconcile;
-
receiveAssignment(topicId, Arrays.asList(0, 1), membershipManager);
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
+ verify(subscriptionState).markPendingRevocation(Set.of());
// Member should complete reconciliation
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1607,6 +1615,7 @@ public class ConsumerMembershipManagerTest {
receiveAssignment(topicId, Collections.singletonList(1),
membershipManager);
membershipManager.poll(time.milliseconds());
+ verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new
TopicPartition(topicName, 0)));
// Revocation should complete without requesting any metadata update
given that the topic
// received in target assignment should exist in local topic name
cache.
@@ -2551,7 +2560,6 @@ public class ConsumerMembershipManagerTest {
assertEquals(assignmentByTopicId,
membershipManager.currentAssignment().partitions);
assertFalse(membershipManager.reconciliationInProgress());
- verify(subscriptionState).markPendingRevocation(anySet());
List<TopicPartition> expectedTopicPartitionAssignment =
buildTopicPartitions(expectedCurrentAssignment);
HashSet<TopicPartition> expectedSet = new
HashSet<>(expectedTopicPartitionAssignment);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
index 36e48250714..7c4c5684bcc 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManagerTest.java
@@ -1100,6 +1100,7 @@ public class ShareMembershipManagerTest {
verifyReconciliationNotTriggered(membershipManager);
membershipManager.poll(time.milliseconds());
+ verify(subscriptionState).markPendingRevocation(Set.of());
// Member should complete reconciliation
assertEquals(MemberState.ACKNOWLEDGING, membershipManager.state());
@@ -1123,6 +1124,7 @@ public class ShareMembershipManagerTest {
receiveAssignment(topicId, Collections.singletonList(1),
membershipManager);
membershipManager.poll(time.milliseconds());
+ verify(subscriptionState, times(2)).markPendingRevocation(Set.of(new
TopicPartition(topicName, 0)));
// Revocation should complete without requesting any metadata update
given that the topic
// received in target assignment should exist in local topic name
cache.
@@ -1423,7 +1425,6 @@ public class ShareMembershipManagerTest {
assertEquals(assignmentByTopicId,
membershipManager.currentAssignment().partitions);
assertFalse(membershipManager.reconciliationInProgress());
- verify(subscriptionState).markPendingRevocation(anySet());
List<TopicPartition> expectedTopicPartitionAssignment =
buildTopicPartitions(expectedCurrentAssignment);
HashSet<TopicPartition> expectedSet = new
HashSet<>(expectedTopicPartitionAssignment);