This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 8367846e9b7 KAFKA-17272: [1/2] System test framework for consumer 
protocol migration (#16845)
8367846e9b7 is described below

commit 8367846e9b7fa8c4bd02201dc18ae12df9db625e
Author: Dongnuo Lyu <[email protected]>
AuthorDate: Wed Aug 14 09:47:51 2024 -0400

    KAFKA-17272: [1/2] System test framework for consumer protocol migration 
(#16845)
    
    This patch adds the necessary framework for system tests of consumer 
protocol upgrade/downgrade paths. The change mainly includes
    - adding `ConsumerProtocolConsumerEventHandler` for the consumers using the 
new protocol.
    - some other fixes to consumer_test.py with the new framework which fixes
      - [KAFKA-16576](https://issues.apache.org/jira/browse/KAFKA-16576): fixed 
by getting `partition_owner` after the group is fully stabilized.
      - [KAFKA-17219](https://issues.apache.org/jira/browse/KAFKA-17219): The 
first issue is the same as KAFKA-16576. The second issue is fixed by taking 
`num_rebalances` after the group is fully stabilized.
      - [KAFKA-17295](https://issues.apache.org/jira/browse/KAFKA-17295): Same 
as KAFKA-17219 second issue. Fixed by taking `num_rebalances` after the group 
is fully stabilized.
    
    A test result of `tests/kafkatest/tests/client` is 
[here](https://confluent-open-source-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/trunk/2024-08-13--001.54e3cf70-869c-465c-bd7a-2ec0c26b2f05--1723594100--confluentinc--kip-848-migration-system-test-framework-comment-aug12--2388f23da7/report.html).
    
    Reviewers: David Jacot <[email protected]>
---
 tests/kafkatest/services/kafka/kafka.py           |  6 +-
 tests/kafkatest/services/verifiable_consumer.py   | 77 +++++++++++++++++------
 tests/kafkatest/tests/client/consumer_test.py     | 12 ++--
 tests/kafkatest/tests/verifiable_consumer_test.py |  9 +++
 4 files changed, 80 insertions(+), 24 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 6a23b6e6898..aa7ffe5ca91 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1755,7 +1755,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                 return False
         return True
 
-    def list_consumer_groups(self, node=None, command_config=None):
+    def list_consumer_groups(self, node=None, command_config=None, state=None, 
type=None):
         """ Get list of consumer groups.
         """
         if node is None:
@@ -1772,6 +1772,10 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
               (consumer_group_script,
                self.bootstrap_servers(self.security_protocol),
                command_config)
+        if state is not None:
+            cmd += " --state %s" % state
+        if type is not None:
+            cmd += " --type %s" % type
         return self.run_cli_tool(node, cmd)
 
     def describe_consumer_group(self, group, node=None, command_config=None):
diff --git a/tests/kafkatest/services/verifiable_consumer.py 
b/tests/kafkatest/services/verifiable_consumer.py
index 0fd332f244d..de1e6f2a1f2 100644
--- a/tests/kafkatest/services/verifiable_consumer.py
+++ b/tests/kafkatest/services/verifiable_consumer.py
@@ -39,17 +39,19 @@ def _create_partition_from_dict(d):
 
 class ConsumerEventHandler(object):
 
-    def __init__(self, node, verify_offsets, idx):
+    def __init__(self, node, verify_offsets, idx, state=ConsumerState.Dead,
+                 revoked_count=0, assigned_count=0, assignment=None,
+                 position=None, committed=None, total_consumed=0):
         self.node = node
-        self.idx = idx
-        self.state = ConsumerState.Dead
-        self.revoked_count = 0
-        self.assigned_count = 0
-        self.assignment = []
-        self.position = {}
-        self.committed = {}
-        self.total_consumed = 0
         self.verify_offsets = verify_offsets
+        self.idx = idx
+        self.state = state
+        self.revoked_count = revoked_count
+        self.assigned_count = assigned_count
+        self.assignment = assignment if assignment is not None else []
+        self.position = position if position is not None else {}
+        self.committed = committed if committed is not None else {}
+        self.total_consumed = total_consumed
 
     def handle_shutdown_complete(self, node=None, logger=None):
         self.state = ConsumerState.Dead
@@ -145,10 +147,10 @@ class ConsumerEventHandler(object):
         else:
             return None
 
-# This needs to be used for cooperative and consumer protocol
+# This needs to be used for cooperative protocol.
 class IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
-    def __init__(self, node, verify_offsets, idx):
-        super().__init__(node, verify_offsets, idx)
+    def __init__(self, node, verify_offsets, idx, **kwargs):
+        super().__init__(node, verify_offsets, idx, **kwargs)
         
     def handle_partitions_revoked(self, event, node, logger):
         self.revoked_count += 1
@@ -176,6 +178,28 @@ class 
IncrementalAssignmentConsumerEventHandler(ConsumerEventHandler):
         logger.debug("Partitions %s assigned to %s" % (assignment, 
node.account.hostname))
         self.assignment.extend(assignment)
 
+# This needs to be used for consumer protocol.
+class 
ConsumerProtocolConsumerEventHandler(IncrementalAssignmentConsumerEventHandler):
+    def __init__(self, node, verify_offsets, idx, **kwargs):
+        super().__init__(node, verify_offsets, idx, **kwargs)
+
+    def handle_partitions_revoked(self, event, node, logger):
+        # The handler state is not transitioned to Rebalancing as the records 
can only be
+        # consumed in Joined state (see 
ConsumerEventHandler.handle_records_consumed).
+        # The consumer with consumer protocol should still be able to consume 
messages during rebalance.
+        self.revoked_count += 1
+        self.position = {}
+        revoked = []
+
+        for topic_partition in event["partitions"]:
+            tp = _create_partition_from_dict(topic_partition)
+            # tp existing in self.assignment is not guaranteed in the new 
consumer
+            # if it shuts down when revoking partitions for reconciliation.
+            if tp in self.assignment:
+                self.assignment.remove(tp)
+            revoked.append(tp)
+
+        logger.debug("Partitions %s revoked from %s" % (revoked, 
node.account.hostname))
 
 class VerifiableConsumer(KafkaPathResolverMixin, VerifiableClientMixin, 
BackgroundThreadService):
     """This service wraps org.apache.kafka.tools.VerifiableConsumer for use in
@@ -245,13 +269,30 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
     def java_class_name(self):
         return "VerifiableConsumer"
 
+    def create_event_handler(self, idx, node):
+        def create_handler_helper(handler_class, node, idx, 
existing_handler=None):
+            if existing_handler is not None:
+                return handler_class(node, self.verify_offsets, idx,
+                                     state=existing_handler.state,
+                                     
revoked_count=existing_handler.revoked_count,
+                                     
assigned_count=existing_handler.assigned_count,
+                                     assignment=existing_handler.assignment,
+                                     position=existing_handler.position,
+                                     committed=existing_handler.committed,
+                                     
total_consumed=existing_handler.total_consumed)
+            else:
+                return handler_class(node, self.verify_offsets, idx)
+        existing_handler = self.event_handlers[node] if node in 
self.event_handlers else None
+        if self.is_consumer_group_protocol_enabled():
+            return create_handler_helper(ConsumerProtocolConsumerEventHandler, 
node, idx, existing_handler)
+        elif self.is_eager():
+            return create_handler_helper(ConsumerEventHandler, node, idx, 
existing_handler)
+        else:
+            return 
create_handler_helper(IncrementalAssignmentConsumerEventHandler, node, idx, 
existing_handler)
+
     def _worker(self, idx, node):
         with self.lock:
-            if node not in self.event_handlers:
-                if self.is_eager():
-                    self.event_handlers[node] = ConsumerEventHandler(node, 
self.verify_offsets, idx)
-                else:
-                    self.event_handlers[node] = 
IncrementalAssignmentConsumerEventHandler(node, self.verify_offsets, idx)
+            self.event_handlers[node] = self.create_event_handler(idx, node)
             handler = self.event_handlers[node]
 
         node.account.ssh("mkdir -p %s" % VerifiableConsumer.PERSISTENT_ROOT, 
allow_fail=False)
@@ -481,4 +522,4 @@ class VerifiableConsumer(KafkaPathResolverMixin, 
VerifiableClientMixin, Backgrou
                     if handler.state != ConsumerState.Dead]
 
     def is_consumer_group_protocol_enabled(self):
-        return self.group_protocol and self.group_protocol.upper() == 
"CONSUMER"
+        return self.group_protocol and self.group_protocol.lower() == 
consumer_group.consumer_group_protocol
diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index fed00b0aac2..6cb82869c4f 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -116,6 +116,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
 
         consumer.start()
         self.await_all_members(consumer)
+        self.await_all_members_stabilized(self.TOPIC, self.NUM_PARTITIONS, 
consumer, timeout_sec=60)
 
         num_rebalances = consumer.num_rebalances()
         # TODO: make this test work with hard shutdowns, which probably 
requires
@@ -204,7 +205,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         num_bounces=[5],
         metadata_quorum=[quorum.isolated_kraft],
         use_new_coordinator=[True],
-        group_protocol=consumer_group.classic_group_protocol
+        group_protocol=[consumer_group.classic_group_protocol]
     )
     def test_static_consumer_bounce_with_eager_assignment(self, 
clean_shutdown, static_membership, bounce_mode, num_bounces, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
         """
@@ -349,6 +350,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         if fencing_stage == "stable":
             consumer.start()
             self.await_members(consumer, len(consumer.nodes))
+            self.await_all_members_stabilized(self.TOPIC, self.NUM_PARTITIONS, 
consumer, timeout_sec=120)
 
             num_rebalances = consumer.num_rebalances()
             conflict_consumer.start()
@@ -415,9 +417,8 @@ class OffsetValidationTest(VerifiableConsumerTest):
 
         consumer.start()
         self.await_all_members(consumer)
-
+        self.await_all_members_stabilized(self.TOPIC, self.NUM_PARTITIONS, 
consumer, timeout_sec=60)
         partition_owner = consumer.owner(partition)
-        assert partition_owner is not None
 
         # startup the producer and ensure that some records have been written
         producer.start()
@@ -481,6 +482,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         producer.start()
         consumer.start()
         self.await_all_members(consumer)
+        self.await_all_members_stabilized(self.TOPIC, self.NUM_PARTITIONS, 
consumer, timeout_sec=60)
 
         num_rebalances = consumer.num_rebalances()
 
@@ -491,7 +493,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         # ensure that the consumers do some work after the broker failure
         self.await_consumed_messages(consumer, min_messages=1000)
 
-        # verify that there were no rebalances on failover
+        # verify that there were no rebalances on failover.
         assert num_rebalances == consumer.num_rebalances(), "Broker failure 
should not cause a rebalance"
 
         consumer.stop_all()
@@ -608,7 +610,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
             consumer.start_node(node)
             self.await_members(consumer, num_started)
             wait_until(lambda: self.valid_assignment(self.TOPIC, 
self.NUM_PARTITIONS, consumer.current_assignment()),
-                timeout_sec=15,
+                timeout_sec=30,
                 err_msg="expected valid assignments of %d partitions when 
num_started %d: %s" % \
                         (self.NUM_PARTITIONS, num_started, \
                          [(str(node.account), a) for node, a in 
consumer.current_assignment().items()]))
diff --git a/tests/kafkatest/tests/verifiable_consumer_test.py 
b/tests/kafkatest/tests/verifiable_consumer_test.py
index 38bb6cf3bd9..08da754732d 100644
--- a/tests/kafkatest/tests/verifiable_consumer_test.py
+++ b/tests/kafkatest/tests/verifiable_consumer_test.py
@@ -86,3 +86,12 @@ class VerifiableConsumerTest(KafkaTest):
         
     def await_all_members(self, consumer):
         self.await_members(consumer, self.num_consumers)
+
+    def await_all_members_stabilized(self, topic, num_partitions, consumer, 
timeout_sec):
+        # Wait until the group is in STABLE state and the consumers reconcile 
to a valid assignment
+        wait_until(lambda: self.group_id in 
self.kafka.list_consumer_groups(state="stable"),
+                   timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting for group %s to transition to 
STABLE state." % self.group_id)
+        wait_until(lambda: self.valid_assignment(topic, num_partitions, 
consumer.current_assignment()),
+                   timeout_sec=timeout_sec,
+                   err_msg="Timeout awaiting for the consumers to reconcile to 
a valid assignment.")

Reply via email to