This is an automated email from the ASF dual-hosted git repository. lianetm pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 8a577fa5af0 MINOR: adding consumer fenced test and log (#19723) 8a577fa5af0 is described below commit 8a577fa5af010684ddeff2f43ca7d3692bba43a4 Author: Lianet Magrans <98415067+lian...@users.noreply.github.com> AuthorDate: Thu May 15 12:57:11 2025 -0400 MINOR: adding consumer fenced test and log (#19723) Adding test to specifically force the fencing path due to delayed rebalance, and validate how the consumer recovers automatically. Running this test and DEBUG log enabled, allows to see the details of the fencing flow: consumer getting fenced due to rebalance exceeded, resetting to epoch 0, rejoining on the next poll with the existing subscription, and being accepted back in the group (so consumption resumes) This is aimed to help understand [KAFKA-19233](https://issues.apache.org/jira/browse/KAFKA-19233) Will add another one in separate PR to also involve commits in similar fencing scenarios. Reviewers: TengYao Chi <frankvi...@apache.org> --- .../consumer/internals/CommitRequestManager.java | 2 + .../kafka/api/PlaintextConsumerPollTest.scala | 47 ++++++++++++++++++++++ 2 files changed, 49 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 9105300487f..b6a6a9f14cf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -589,6 +589,8 @@ public class CommitRequestManager implements RequestManager, MemberStateListener if (memberEpoch.isEmpty() && memberInfo.memberEpoch.isPresent()) { log.info("Member {} won't include epoch in following offset " + "commit/fetch requests because it has left the group.", memberInfo.memberId); + } else if (memberEpoch.isPresent()) { + log.debug("Member {} will include new member epoch {} in following offset commit/fetch requests.", memberId, memberEpoch); } memberInfo.memberId = memberId; memberInfo.memberEpoch = memberEpoch; diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala index 42e9c50fc4c..116ef396ee6 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala @@ -129,6 +129,53 @@ class PlaintextConsumerPollTest extends AbstractConsumerTest { assertTrue(commitCompleted) } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) + @MethodSource(Array("getTestGroupProtocolParametersAll")) + def testConsumerRecoveryOnPollAfterDelayedRebalance(groupProtocol: String): Unit = { + val rebalanceTimeout = 1000 + this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, rebalanceTimeout.toString) + this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false.toString) + + val producer = createProducer() + val numMessages = 10 + createTopicAndSendRecords(producer, "otherTopic", 1, numMessages) + sendRecords(producer, numMessages, tp) + + var rebalanceTimeoutExceeded = false + + // Subscribe consumer that will reconcile in time on the first rebalance, but will + // take longer than the allowed timeout in the second rebalance (onPartitionsRevoked) to get fenced by the broker. + // The consumer should recover after being fenced (automatically rejoin the group on the next call to poll) + val consumer = createConsumer() + val listener = new TestConsumerReassignmentListener { + + override def onPartitionsRevoked(partitions: util.Collection[TopicPartition]): Unit = { + if (!partitions.isEmpty && partitions.contains(tp)) { + // on the second rebalance (after we have joined the group initially), sleep longer + // than rebalance timeout to get fenced. + Utils.sleep(rebalanceTimeout + 500) + rebalanceTimeoutExceeded = true + } + super.onPartitionsRevoked(partitions) + } + } + + // Subscribe to get first assignment (no delays) and verify consumption + consumer.subscribe(List(topic).asJava, listener) + var records = awaitNonEmptyRecords(consumer, tp, 0L) + assertEquals(numMessages, records.count()) + + // Subscribe to different topic. This will trigger the delayed revocation exceeding rebalance timeout and get fenced + consumer.subscribe(List("otherTopic").asJava, listener) + TestUtils.pollUntilTrue(consumer, () => rebalanceTimeoutExceeded, "Timeout waiting for delayed callback to complete") + + // Verify consumer recovers after being fenced, being able to continue consuming. + // (The member should automatically rejoin on the next poll, with the new topic as subscription) + val tpOther = new TopicPartition("otherTopic", 0) + records = awaitNonEmptyRecords(consumer, tpOther, 0L) + assertEquals(numMessages, records.count()) + } + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames) @MethodSource(Array("getTestGroupProtocolParametersAll")) def testMaxPollIntervalMsDelayInAssignment(groupProtocol: String): Unit = {