This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 8367846e9b7 KAFKA-17272: [1/2] System test framework for consumer
protocol migration (#16845)
8367846e9b7 is described below
commit 8367846e9b7fa8c4bd02201dc18ae12df9db625e
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Wed Aug 14 09:47:51 2024 -0400
KAFKA-17272: [1/2] System test framework for consumer protocol migration
(#16845)
This patch adds the necessary framework for system tests of consumer
protocol upgrade/downgrade paths. The change mainly includes
- adding `ConsumerProtocolConsumerEventHandler` for the consumers using the
new protocol.
- some other fixes to consumer_test.py with the new framework which fixes
- [KAFKA-16576](https://issues.apache.org/jira/browse/KAFKA-16576): fixed
by getting `partition_owner` after the group is fully stabilized.
- [KAFKA-17219](https://issues.apache.org/jira/browse/KAFKA-17219): The
first issue is the same as KAFKA-16576. The second issue is fixed by taking
`num_rebalances` after the group is fully stabilized.
- [KAFKA-17295](https://issues.apache.org/jira/browse/KAFKA-17295): Same
as KAFKA-17219 second issue. Fixed by taking `num_rebalances` after the group
is fully stabilized.
A test result of `tests/kafkatest/tests/client` is
[here](https://confluent-open-source-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/trunk/2024-08-13--001.54e3cf70-869c-465c-bd7a-2ec0c26b2f05--1723594100--confluentinc--kip-848-migration-system-test-framework-comment-aug12--2388f23da7/report.html).
Reviewers: David Jacot <[email protected]>
---
tests/kafkatest/services/kafka/kafka.py | 6 +-
tests/kafkatest/services/verifiable_consumer.py | 77 +++++++++++++++++------
tests/kafkatest/tests/client/consumer_test.py | 12 ++--
tests/kafkatest/tests/verifiable_consumer_test.py | 9 +++
4 files changed, 80 insertions(+), 24 deletions(-)
diff --git a/tests/kafkatest/services/kafka/kafka.py
b/tests/kafkatest/services/kafka/kafka.py
index 6a23b6e6898..aa7ffe5ca91 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1755,7 +1755,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
return False
return True
- def list_consumer_groups(self, node=None, command_config=None):
+ def list_consumer_groups(self, node=None, command_config=None, state=None,
type=None):
""" Get list of consumer groups.
"""
if node is None:
@@ -1772,6 +1772,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin,
Service):
(consumer_group_script,
self.bootstrap_servers(self.security_protocol),
command_config)
+ if state is not None:
+ cmd += " --state %s" % state
+ if type is not None:
+ cmd += " --type %s" % type
return self.run_cli_tool(node, cmd)
def describe_consumer_group(self, group, node=None, command_config=None):
diff --git a/tests/kafkatest/services/verifiable_consumer.py
b/tests/kafkatest/services/verifiable_consumer.py
index 0fd332f244d..de1e6f2a1f2 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -39,17 +39,19 @@ def _create_partition_from_dict(d):
class ConsumerEventHandler(object):
- def __init__(self, node, verify_offsets, idx):
+ def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
+ revoked_count=0, assigned_count=0, assignment=None,
+ position=None, committed=None, total_consumed=0):
self.node = node
- self.idx = idx
- self.state = ConsumerState.Dead
- self.revoked_count = 0
- self.assigned_count = 0
- self.assignment = []
- self.position = {}
- self.committed = {}
- self.total_consumed = 0
self.verify_offsets = verify_offsets
+ self.idx = idx
+ self.state = state
+ self.revoked_count = revoked_count
+ self.assigned_count = assigned_count
+ self.assignment = assignment if assignment is not None else []
+ self.position = position if position is not None else {}
+ self.committed = committed if committed is not None else {}
+ self.total_consumed = total_consumed
def handle_shutdown_complete(self, node=None, logger=None):
self.state = ConsumerState.Dead
@@ -145,10 +147,10 @@ class ConsumerEventHandler(object):
else:
return None
-# This needs to be used for cooperative and consumer protocol
+# This needs to be used for cooperative protocol.
class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
- def __init__(self, node, verify_offsets, idx):
- super().__init__(node, verify_offsets, idx)
+ def __init__(self, node, verify_offsets, idx, **kwargs):
+ super().__init__(node, verify_offsets, idx, **kwargs)
def handle_partitions_revoked(self, event, node, logger):
self.revoked_count += 1
@@ -176,6 +178,28 @@ class
IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
logger.debug("Partitions %s assigned to %s" % (assignment,
node.account.hostname))
self.assignment.extend(assignment)
+# This needs to be used for consumer protocol.
+class
ConsumerProtocolConsumerEventHandler(IncrementalAssignmentConsumerEventHandler):
+ def __init__(self, node, verify_offsets, idx, **kwargs):
+ super().__init__(node, verify_offsets, idx, **kwargs)
+
+ def handle_partitions_revoked(self, event, node, logger):
+ # The handler state is not transitioned to Rebalancing as the records
can only be
+ # consumed in Joined state (see
ConsumerEventHandler.handle_records_consumed).
+ # The consumer with consumer protocol should still be able to consume
messages during rebalance.
+ self.revoked_count += 1
+ self.position = {}
+ revoked = []
+
+ for topic_partition in event["partitions"]:
+ tp = _create_partition_from_dict(topic_partition)
+ # tp existing in self.assignment is not guaranteed in the new
consumer
+ # if it shuts down when revoking partitions for reconciliation.
+ if tp in self.assignment:
+ self.assignment.remove(tp)
+ revoked.append(tp)
+
+ logger.debug("Partitions %s revoked from %s" % (revoked,
node.account.hostname))
class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin,
BackgroundThreadService):
"""This service wraps org.apache.kafka.tools.VerifiableConsumer for use in
@@ -245,13 +269,30 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
def java_class_name(self):
return "VerifiableConsumer"
+ def create_event_handler(self, idx, node):
+ def create_handler_helper(handler_class, node, idx,
existing_handler=None):
+ if existing_handler is not None:
+ return handler_class(node, self.verify_offsets, idx,
+ state=existing_handler.state,
+
revoked_count=existing_handler.revoked_count,
+
assigned_count=existing_handler.assigned_count,
+ assignment=existing_handler.assignment,
+ position=existing_handler.position,
+ committed=existing_handler.committed,
+
total_consumed=existing_handler.total_consumed)
+ else:
+ return handler_class(node, self.verify_offsets, idx)
+ existing_handler = self.event_handlers[node] if node in
self.event_handlers else None
+ if self.is_consumer_group_protocol_enabled():
+ return create_handler_helper(ConsumerProtocolConsumerEventHandler,
node, idx, existing_handler)
+ elif self.is_eager():
+ return create_handler_helper(ConsumerEventHandler, node, idx,
existing_handler)
+ else:
+ return
create_handler_helper(IncrementalAssignmentConsumerEventHandler, node, idx,
existing_handler)
+
def _worker(self, idx, node):
with self.lock:
- if node not in self.event_handlers:
- if self.is_eager():
- self.event_handlers[node] = ConsumerEventHandler(node,
self.verify_offsets, idx)
- else:
- self.event_handlers[node] =
IncrementalAssignmentConsumerEventHandler(node, self.verify_offsets, idx)
+ self.event_handlers[node] = self.create_event_handler(idx, node)
handler = self.event_handlers[node]
node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT,
allow_fail=False)
@@ -481,4 +522,4 @@ class VerifiableConsumer(KafkaPathResolverMixin,
VerifiableClientMixin, Backgrou
if handler.state != ConsumerState.Dead]
def is_consumer_group_protocol_enabled(self):
- return self.group_protocol and self.group_protocol.upper() ==
"CONSUMER"
+ return self.group_protocol and self.group_protocol.lower() ==
consumer_group.consumer_group_protocol
diff --git a/tests/kafkatest/tests/client/consumer_test.py
b/tests/kafkatest/tests/client/consumer_test.py
index fed00b0aac2..6cb82869c4f 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -116,6 +116,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
consumer.start()
self.await_all_members(consumer)
+ self.await_all_members_stabilized(self.TOPIC, self.NUM_PARTITIONS,
consumer, timeout_sec=60)
num_rebalances = consumer.num_rebalances()
# TODO: make this test work with hard shutdowns, which probably
requires
@@ -204,7 +205,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
num_bounces=[5],
metadata_quorum=[quorum.isolated_kraft],
use_new_coordinator=[True],
- group_protocol=consumer_group.classic_group_protocol
+ group_protocol=[consumer_group.classic_group_protocol]
)
def test_static_consumer_bounce_with_eager_assignment(self,
clean_shutdown, static_membership, bounce_mode, num_bounces,
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
"""
@@ -349,6 +350,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
if fencing_stage == "stable":
consumer.start()
self.await_members(consumer, len(consumer.nodes))
+ self.await_all_members_stabilized(self.TOPIC, self.NUM_PARTITIONS,
consumer, timeout_sec=120)
num_rebalances = consumer.num_rebalances()
conflict_consumer.start()
@@ -415,9 +417,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
consumer.start()
self.await_all_members(consumer)
-
+ self.await_all_members_stabilized(self.TOPIC, self.NUM_PARTITIONS,
consumer, timeout_sec=60)
partition_owner = consumer.owner(partition)
- assert partition_owner is not None
# startup the producer and ensure that some records have been written
producer.start()
@@ -481,6 +482,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
producer.start()
consumer.start()
self.await_all_members(consumer)
+ self.await_all_members_stabilized(self.TOPIC, self.NUM_PARTITIONS,
consumer, timeout_sec=60)
num_rebalances = consumer.num_rebalances()
@@ -491,7 +493,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
# ensure that the consumers do some work after the broker failure
self.await_consumed_messages(consumer, min_messages=1000)
- # verify that there were no rebalances on failover
+ # verify that there were no rebalances on failover.
assert num_rebalances == consumer.num_rebalances(), "Broker failure
should not cause a rebalance"
consumer.stop_all()
@@ -608,7 +610,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
consumer.start_node(node)
self.await_members(consumer, num_started)
wait_until(lambda: self.valid_assignment(self.TOPIC,
self.NUM_PARTITIONS, consumer.current_assignment()),
- timeout_sec=15,
+ timeout_sec=30,
err_msg="expected valid assignments of %d partitions when
num_started %d: %s" % \
(self.NUM_PARTITIONS, num_started, \
[(str(node.account), a) for node, a in
consumer.current_assignment().items()]))
diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py
b/tests/kafkatest/tests/verifiable_consumer_test.py
index 38bb6cf3bd9..08da754732d 100644
--- a/tests/kafkatest/tests/verifiable_consumer_test.py
+++ b/tests/kafkatest/tests/verifiable_consumer_test.py
@@ -86,3 +86,12 @@ class VerifiableConsumerTest(KafkaTest):
def await_all_members(self, consumer):
self.await_members(consumer, self.num_consumers)
+
+ def await_all_members_stabilized(self, topic, num_partitions, consumer,
timeout_sec):
+ # Wait until the group is in STABLE state and the consumers reconcile
to a valid assignment
+ wait_until(lambda: self.group_id in
self.kafka.list_consumer_groups(state="stable"),
+ timeout_sec=timeout_sec,
+ err_msg="Timed out waiting for group %s to transition to
STABLE state." % self.group_id)
+ wait_until(lambda: self.valid_assignment(topic, num_partitions,
consumer.current_assignment()),
+ timeout_sec=timeout_sec,
+ err_msg="Timeout awaiting for the consumers to reconcile to
a valid assignment.")