This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new e1d831396d8 MINOR: cherry-pick from trunk fix for flaky fence static
consumer system tests (#21034)
e1d831396d8 is described below
commit e1d831396d86fce76c8c3f1c38bf330f95643a8c
Author: Bill Bejeck <[email protected]>
AuthorDate: Mon Dec 1 16:27:09 2025 -0500
MINOR: cherry-pick from trunk fix for flaky fence static consumer system
tests (#21034)
cherry-picked from trunk:
```
MINOR: Prevent re-join flakiness in test_fencing_static_consumer by
ensuring conflicting static consumers terminate (#20772)
Related discussion:
https://github.com/apache/kafka/pull/20594#pullrequestreview-3362587453
### Problem
The test `OffsetValidationTest.test_fencing_static_consumer` failed when
executed with
`fencing_stage=stable` and `group_protocol=consumer`. It timed out while
waiting for the group to become empty because the conflicting static
consumers re-joined after the original members stopped, keeping the
group non-empty and causing the timeout.
### Fix
For the consumer-protocol path, the test now waits for all conflicting
consumer processes to terminate before stopping the original static
members. This ensures that each conflicting consumers is fully fenced
and cannot re-join the group after the original members stop.
Reviewers: Chia-Ping Tsai <[email protected]>
```
Reviewers: Lianet Magrans<[email protected]>
Co-authored-by: Hong-Yi Chen <[email protected]>
---
tests/kafkatest/services/verifiable_consumer.py | 14 ++++++++++++--
tests/kafkatest/tests/client/consumer_test.py | 13 +++++++++++++
2 files changed, 25 insertions(+), 2 deletions(-)
diff --git a/tests/kafkatest/services/verifiable_consumer.py
b/tests/kafkatest/services/verifiable_consumer.py
index 8264566f1c2..2e19bc6757a 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -42,7 +42,8 @@ class ConsumerEventHandler(object):
def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
revoked_count=0, assigned_count=0, assignment=None,
- position=None, committed=None, total_consumed=0):
+ position=None, committed=None, total_consumed=0,
+ shutdown_complete=False):
self.node = node
self.verify_offsets = verify_offsets
self.idx = idx
@@ -53,11 +54,13 @@ class ConsumerEventHandler(object):
self.position = position if position is not None else {}
self.committed = committed if committed is not None else {}
self.total_consumed = total_consumed
+ self.shutdown_complete = shutdown_complete
def handle_shutdown_complete(self, node=None, logger=None):
self.state = ConsumerState.Dead
self.assignment = []
self.position = {}
+ self.shutdown_complete = True
if node is not None and logger is not None:
logger.debug("Shut down %s" % node.account.hostname)
@@ -277,7 +280,8 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
assignment=existing_handler.assignment,
position=existing_handler.position,
committed=existing_handler.committed,
-
total_consumed=existing_handler.total_consumed)
+
total_consumed=existing_handler.total_consumed,
+
shutdown_complete=existing_handler.shutdown_complete)
else:
return handler_class(node, self.verify_offsets, idx)
existing_handler = self.event_handlers[node] if node in
self.event_handlers else None
@@ -292,6 +296,7 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
with self.lock:
self.event_handlers[node] = self.create_event_handler(idx, node)
handler = self.event_handlers[node]
+ handler.shutdown_complete = False
node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT,
allow_fail=False)
@@ -521,5 +526,10 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
return [handler.node for handler in self.event_handlers.values()
if handler.state != ConsumerState.Dead]
+ def shutdown_complete_nodes(self):
+ with self.lock:
+ return [handler.node for handler in self.event_handlers.values()
+ if handler.shutdown_complete]
+
def is_consumer_group_protocol_enabled(self):
return self.group_protocol and self.group_protocol.lower() ==
consumer_group.consumer_group_protocol
diff --git a/tests/kafkatest/tests/client/consumer_test.py
b/tests/kafkatest/tests/client/consumer_test.py
index 4edaa498bfb..85a0282e291 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -18,6 +18,7 @@ from ducktape.utils.util import wait_until
from ducktape.mark.resource import cluster
from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
import signal
@@ -74,6 +75,14 @@ class OffsetValidationTest(VerifiableConsumerTest):
self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
return consumer
+ def await_conflict_consumers_fenced(self, conflict_consumer):
+ # Rely on explicit shutdown_complete events from the verifiable
consumer to guarantee each conflict member
+ # reached the fenced path rather than remaining in the default DEAD
state prior to startup.
+ wait_until(lambda: len(conflict_consumer.shutdown_complete_nodes()) ==
len(conflict_consumer.nodes) and
+ len(conflict_consumer.dead_nodes()) ==
len(conflict_consumer.nodes),
+ timeout_sec=60,
+ err_msg="Timed out waiting for conflict consumers to report
shutdown completion after fencing")
+
@cluster(num_nodes=7)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
@@ -332,6 +341,10 @@ class OffsetValidationTest(VerifiableConsumerTest):
raise
assert len(conflict_consumer.joined_nodes()) == 0
+ # Conflict consumers will terminate due to a fatal
UnreleasedInstanceIdException error.
+ # Wait for termination to complete to prevent conflict
consumers from immediately re-joining the group while existing nodes are
shutting down.
+ self.await_conflict_consumers_fenced(conflict_consumer)
+
# Stop existing nodes, so conflicting ones should be able to
join.
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) ==
len(consumer.nodes),