This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 207e4d60285 MINOR: Address flaky wait for producer acks logging for 
checking when not al… (#20957)
207e4d60285 is described below

commit 207e4d60285dcfbd4736b0cd1bd2b39a600d788b
Author: Bill Bejeck <[email protected]>
AuthorDate: Tue Nov 25 15:23:19 2025 -0500

    MINOR: Address flaky wait for producer acks logging for checking when not 
al… (#20957)
    
    Adding the following for some system tests
    - Increased timeout for `await_produced_messages`
    - Added more logging for `test_fencing_consumer` which has exhibited
    flakiness and is hard to reproduce
    
    Reviewers: Lianet Magrans<[email protected]>
---
 tests/kafkatest/services/kafka/kafka.py       | 26 ++++++++++++++++++++++++++
 tests/kafkatest/tests/client/consumer_test.py | 27 +++++++++++++++++++++------
 2 files changed, 47 insertions(+), 6 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index ff106425005..17fcc147429 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -2023,3 +2023,29 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 
     def java_class_name(self):
         return "kafka\.Kafka"
+
+    def describe_consumer_group_members(self, group, node=None, 
command_config=None):
+        """ Describe a consumer group.
+        """
+        if node is None:
+            node = self.nodes[0]
+        consumer_group_script = self.path.script("kafka-consumer-groups.sh", 
node)
+
+        if command_config is None:
+            command_config = ""
+        else:
+            command_config = "--command-config " + command_config
+
+        cmd = fix_opts_for_new_jvm(node)
+        cmd += "%s --bootstrap-server %s %s --group %s --describe" % \
+               (consumer_group_script,
+                self.bootstrap_servers(self.security_protocol),
+                command_config, group)
+
+        cmd += " --members"
+
+        output_lines = []
+        for line in node.account.ssh_capture(cmd):
+            if not (line.startswith("SLF4J") or line.startswith("GROUP") or 
line.strip() == ""):
+                output_lines.append(line.strip())
+        return output_lines
diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index c6ecfc371a6..4edaa498bfb 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -297,7 +297,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         producer = self.setup_producer(self.TOPIC)
 
         producer.start()
-        self.await_produced_messages(producer)
+        self.await_produced_messages(producer, timeout_sec=120)
 
         consumer = self.setup_consumer(self.TOPIC, static_membership=True, 
group_protocol=group_protocol)
 
@@ -324,18 +324,33 @@ class OffsetValidationTest(VerifiableConsumerTest):
                 # Consumer protocol: Existing members should remain active and 
new conflicting ones should not be able to join.
                 self.await_consumed_messages(consumer)
                 assert num_rebalances == consumer.num_rebalances(), "Static 
consumers attempt to join with instance id in use should not cause a rebalance"
-                assert len(consumer.joined_nodes()) == len(consumer.nodes)
+                try:
+                    assert len(consumer.joined_nodes()) == len(consumer.nodes)
+                except AssertionError:
+                    self.logger.debug("All members not in group %s. Describe 
output is %s", self.group_id,
+                                      " 
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+                    raise
                 assert len(conflict_consumer.joined_nodes()) == 0
-                
+
                 # Stop existing nodes, so conflicting ones should be able to 
join.
                 consumer.stop_all()
                 wait_until(lambda: len(consumer.dead_nodes()) == 
len(consumer.nodes),
                            timeout_sec=60,
-                           err_msg="Timed out waiting for the consumer to 
shutdown")
+                           err_msg="Timed out waiting for the consumer to 
shutdown. Describe output is %s" % " 
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+
+                # Wait until the group becomes empty to ensure the instance ID 
is released.
+                # We use the 60-second timeout because the consumer session 
timeout is 45 seconds adding some time for latency.
+                wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(state="empty"),
+                           timeout_sec=60,
+                           err_msg="Timed out waiting for the consumers to be 
removed from the group. Describe output is %s." % " 
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+
                 conflict_consumer.start()
-                self.await_members(conflict_consumer, num_conflict_consumers)
+                try:
+                    self.await_members(conflict_consumer, 
num_conflict_consumers)
+                except TimeoutError:
+                    self.logger.debug("All conflict members not in group %s. 
Describe output is %s", self.group_id, " 
".join(self.kafka.describe_consumer_group_members(self.group_id)))
+                    raise
 
-            
         else:
             consumer.start()
             conflict_consumer.start()

Reply via email to