This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8a577fa5af0 MINOR: adding consumer fenced test and log (#19723)
8a577fa5af0 is described below

commit 8a577fa5af010684ddeff2f43ca7d3692bba43a4
Author: Lianet Magrans <98415067+lian...@users.noreply.github.com>
AuthorDate: Thu May 15 12:57:11 2025 -0400

    MINOR: adding consumer fenced test and log (#19723)
    
    Adding test to specifically force the fencing path due to delayed
    rebalance, and validate how the consumer recovers automatically.
    
    Running this test and DEBUG log enabled, allows to see the details of
    the fencing flow: consumer getting fenced due to rebalance exceeded,
    resetting to epoch 0, rejoining on the next poll with the existing
    subscription, and being accepted back in the group (so consumption
    resumes)
    
    This is aimed to help understand
    [KAFKA-19233](https://issues.apache.org/jira/browse/KAFKA-19233)
    
    Will add another one in separate PR to also involve commits in similar
    fencing scenarios.
    
    Reviewers: TengYao Chi <frankvi...@apache.org>
---
 .../consumer/internals/CommitRequestManager.java   |  2 +
 .../kafka/api/PlaintextConsumerPollTest.scala      | 47 ++++++++++++++++++++++
 2 files changed, 49 insertions(+)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
index 9105300487f..b6a6a9f14cf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java
@@ -589,6 +589,8 @@ public class CommitRequestManager implements 
RequestManager, MemberStateListener
         if (memberEpoch.isEmpty() && memberInfo.memberEpoch.isPresent()) {
             log.info("Member {} won't include epoch in following offset " +
                 "commit/fetch requests because it has left the group.", 
memberInfo.memberId);
+        } else if (memberEpoch.isPresent()) {
+            log.debug("Member {} will include new member epoch {} in following 
offset commit/fetch requests.", memberId, memberEpoch);
         }
         memberInfo.memberId = memberId;
         memberInfo.memberEpoch = memberEpoch;
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
index 42e9c50fc4c..116ef396ee6 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala
@@ -129,6 +129,53 @@ class PlaintextConsumerPollTest extends 
AbstractConsumerTest {
     assertTrue(commitCompleted)
   }
 
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
+  @MethodSource(Array("getTestGroupProtocolParametersAll"))
+  def testConsumerRecoveryOnPollAfterDelayedRebalance(groupProtocol: String): 
Unit = {
+    val rebalanceTimeout = 1000
+    
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 
rebalanceTimeout.toString)
+    this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false.toString)
+
+    val producer = createProducer()
+    val numMessages = 10
+    createTopicAndSendRecords(producer, "otherTopic", 1, numMessages)
+    sendRecords(producer, numMessages, tp)
+
+    var rebalanceTimeoutExceeded = false
+
+    // Subscribe consumer that will reconcile in time on the first rebalance, 
but will
+    // take longer than the allowed timeout in the second rebalance 
(onPartitionsRevoked) to get fenced by the broker.
+    // The consumer should recover after being fenced (automatically rejoin 
the group on the next call to poll)
+    val consumer = createConsumer()
+    val listener = new TestConsumerReassignmentListener {
+
+      override def onPartitionsRevoked(partitions: 
util.Collection[TopicPartition]): Unit = {
+        if (!partitions.isEmpty && partitions.contains(tp)) {
+          // on the second rebalance (after we have joined the group 
initially), sleep longer
+          // than rebalance timeout to get fenced.
+          Utils.sleep(rebalanceTimeout + 500)
+          rebalanceTimeoutExceeded = true
+        }
+        super.onPartitionsRevoked(partitions)
+      }
+    }
+
+    // Subscribe to get first assignment (no delays) and verify consumption
+    consumer.subscribe(List(topic).asJava, listener)
+    var records = awaitNonEmptyRecords(consumer, tp, 0L)
+    assertEquals(numMessages, records.count())
+
+    // Subscribe to different topic. This will trigger the delayed revocation 
exceeding rebalance timeout and get fenced
+    consumer.subscribe(List("otherTopic").asJava, listener)
+    TestUtils.pollUntilTrue(consumer, () => rebalanceTimeoutExceeded, "Timeout 
waiting for delayed callback to complete")
+
+    // Verify consumer recovers after being fenced, being able to continue 
consuming.
+    // (The member should automatically rejoin on the next poll, with the new 
topic as subscription)
+    val tpOther = new TopicPartition("otherTopic", 0)
+    records = awaitNonEmptyRecords(consumer, tpOther, 0L)
+    assertEquals(numMessages, records.count())
+  }
+
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def testMaxPollIntervalMsDelayInAssignment(groupProtocol: String): Unit = {

Reply via email to