This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 75768dd3383 MINOR: Prevent re-join flakiness in
test_fencing_static_consumer by ensuring conflicting static consumers terminate
(#20772)
75768dd3383 is described below
commit 75768dd3383aaa98edd243351718c987808f7dab
Author: Hong-Yi Chen <[email protected]>
AuthorDate: Fri Oct 31 14:52:49 2025 +0800
MINOR: Prevent re-join flakiness in test_fencing_static_consumer by
ensuring conflicting static consumers terminate (#20772)
Related discussion:
https://github.com/apache/kafka/pull/20594#pullrequestreview-3362587453
### Problem
The test `OffsetValidationTest.test_fencing_static_consumer` failed when
executed with
`fencing_stage=stable` and `group_protocol=consumer`.
It timed out while waiting for the group to become empty because the
conflicting static consumers re-joined after the original members
stopped, keeping the group non-empty and causing the timeout.
### Fix
For the consumer-protocol path, the test now waits for all conflicting
consumer processes to terminate before stopping the original static
members. This ensures that each conflicting consumers is fully fenced
and cannot re-join the group after the original members stop.
Reviewers: Chia-Ping Tsai <[email protected]>
---
tests/kafkatest/services/verifiable_consumer.py | 14 ++++++++++++--
tests/kafkatest/tests/client/consumer_test.py | 15 ++++++++++++++-
2 files changed, 26 insertions(+), 3 deletions(-)
diff --git a/tests/kafkatest/services/verifiable_consumer.py
b/tests/kafkatest/services/verifiable_consumer.py
index 9a8a294a2aa..6e9716761f6 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -42,7 +42,8 @@ class ConsumerEventHandler(object):
def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
revoked_count=0, assigned_count=0, assignment=None,
- position=None, committed=None, total_consumed=0):
+ position=None, committed=None, total_consumed=0,
+ shutdown_complete=False):
self.node = node
self.verify_offsets = verify_offsets
self.idx = idx
@@ -53,11 +54,13 @@ class ConsumerEventHandler(object):
self.position = position if position is not None else {}
self.committed = committed if committed is not None else {}
self.total_consumed = total_consumed
+ self.shutdown_complete = shutdown_complete
def handle_shutdown_complete(self, node=None, logger=None):
self.state = ConsumerState.Dead
self.assignment = []
self.position = {}
+ self.shutdown_complete = True
if node is not None and logger is not None:
logger.debug("Shut down %s" % node.account.hostname)
@@ -277,7 +280,8 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
assignment=existing_handler.assignment,
position=existing_handler.position,
committed=existing_handler.committed,
-
total_consumed=existing_handler.total_consumed)
+
total_consumed=existing_handler.total_consumed,
+
shutdown_complete=existing_handler.shutdown_complete)
else:
return handler_class(node, self.verify_offsets, idx)
existing_handler = self.event_handlers[node] if node in
self.event_handlers else None
@@ -292,6 +296,7 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
with self.lock:
self.event_handlers[node] = self.create_event_handler(idx, node)
handler = self.event_handlers[node]
+ handler.shutdown_complete = False
node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT,
allow_fail=False)
@@ -526,5 +531,10 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
return [handler.node for handler in self.event_handlers.values()
if handler.state != ConsumerState.Dead]
+ def shutdown_complete_nodes(self):
+ with self.lock:
+ return [handler.node for handler in self.event_handlers.values()
+ if handler.shutdown_complete]
+
def is_consumer_group_protocol_enabled(self):
return self.group_protocol and self.group_protocol.lower() ==
consumer_group.consumer_group_protocol
diff --git a/tests/kafkatest/tests/client/consumer_test.py
b/tests/kafkatest/tests/client/consumer_test.py
index cb964e4c303..7a8614a1cd9 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -18,6 +18,7 @@ from ducktape.utils.util import wait_until
from ducktape.mark.resource import cluster
from kafkatest.tests.verifiable_consumer_test import VerifiableConsumerTest
+from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.services.kafka import TopicPartition, quorum, consumer_group
import signal
@@ -74,6 +75,14 @@ class OffsetValidationTest(VerifiableConsumerTest):
self.mark_for_collect(consumer, 'verifiable_consumer_stdout')
return consumer
+ def await_conflict_consumers_fenced(self, conflict_consumer):
+ # Rely on explicit shutdown_complete events from the verifiable
consumer to guarantee each conflict member
+ # reached the fenced path rather than remaining in the default DEAD
state prior to startup.
+ wait_until(lambda: len(conflict_consumer.shutdown_complete_nodes()) ==
len(conflict_consumer.nodes) and
+ len(conflict_consumer.dead_nodes()) ==
len(conflict_consumer.nodes),
+ timeout_sec=60,
+ err_msg="Timed out waiting for conflict consumers to report
shutdown completion after fencing")
+
@cluster(num_nodes=7)
@matrix(
metadata_quorum=[quorum.isolated_kraft],
@@ -326,7 +335,11 @@ class OffsetValidationTest(VerifiableConsumerTest):
assert num_rebalances == consumer.num_rebalances(), "Static
consumers attempt to join with instance id in use should not cause a rebalance"
assert len(consumer.joined_nodes()) == len(consumer.nodes)
assert len(conflict_consumer.joined_nodes()) == 0
-
+
+ # Conflict consumers will terminate due to a fatal
UnreleasedInstanceIdException error.
+ # Wait for termination to complete to prevent conflict
consumers from immediately re-joining the group while existing nodes are
shutting down.
+ self.await_conflict_consumers_fenced(conflict_consumer)
+
# Stop existing nodes, so conflicting ones should be able to
join.
consumer.stop_all()
wait_until(lambda: len(consumer.dead_nodes()) ==
len(consumer.nodes),