This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new bc782d0fb76 KAFKA-17915: Convert Kafka Client system tests to use 
KRaft (#17669)
bc782d0fb76 is described below

commit bc782d0fb763fc9ee22cb85684e227adfd081f09
Author: Kirk True <k...@kirktrue.pro>
AuthorDate: Tue Jan 14 05:47:15 2025 -0800

    KAFKA-17915: Convert Kafka Client system tests to use KRaft (#17669)
    
    Reviewers: Lianet Magrans <lmagr...@confluent.io>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 tests/kafkatest/tests/client/consumer_test.py        | 20 ++++++++++----------
 .../org/apache/kafka/tools/VerifiableConsumer.java   |  6 +++---
 2 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/tests/kafkatest/tests/client/consumer_test.py 
b/tests/kafkatest/tests/client/consumer_test.py
index bc35cab220f..5e5c5ff308d 100644
--- a/tests/kafkatest/tests/client/consumer_test.py
+++ b/tests/kafkatest/tests/client/consumer_test.py
@@ -84,7 +84,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False, group_protocol=None):
+    def test_broker_rolling_bounce(self, 
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, 
group_protocol=None):
         """
         Verify correct consumer behavior when the brokers are consecutively 
restarted.
 
@@ -143,7 +143,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_consumer_bounce(self, clean_shutdown, bounce_mode, 
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, 
group_protocol=None):
         """
         Verify correct consumer behavior when the consumers in the group are 
consecutively restarted.
 
@@ -202,7 +202,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         use_new_coordinator=[True],
         group_protocol=[consumer_group.classic_group_protocol]
     )
-    def test_static_consumer_bounce_with_eager_assignment(self, 
clean_shutdown, static_membership, bounce_mode, num_bounces, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_static_consumer_bounce_with_eager_assignment(self, 
clean_shutdown, static_membership, bounce_mode, num_bounces, 
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, 
group_protocol=None):
         """
         Verify correct static consumer behavior when the consumers in the 
group are restarted. In order to make 
         sure the behavior of static members are different from dynamic ones, 
we take both static and dynamic
@@ -275,7 +275,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_static_consumer_persisted_after_rejoin(self, bounce_mode, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_static_consumer_persisted_after_rejoin(self, bounce_mode, 
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, 
group_protocol=None):
         """
         Verify that the updated member.id(updated_member_id) caused by static 
member rejoin would be persisted. If not,
         after the brokers rolling bounce, the migrated group coordinator would 
load the stale persisted member.id and
@@ -317,7 +317,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_fencing_static_consumer(self, num_conflict_consumers, 
fencing_stage, metadata_quorum=quorum.zk, use_new_coordinator=False, 
group_protocol=None):
+    def test_fencing_static_consumer(self, num_conflict_consumers, 
fencing_stage, metadata_quorum=quorum.isolated_kraft, 
use_new_coordinator=False, group_protocol=None):
         """
         Verify correct static consumer behavior when there are conflicting 
consumers with same group.instance.id.
 
@@ -401,7 +401,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_consumer_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_consumer_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, 
group_protocol=None):
         partition = TopicPartition(self.TOPIC, 0)
 
         consumer = self.setup_consumer(self.TOPIC, 
enable_autocommit=enable_autocommit, group_protocol=group_protocol)
@@ -459,7 +459,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_broker_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None):
+    def test_broker_failure(self, clean_shutdown, enable_autocommit, 
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, 
group_protocol=None):
         partition = TopicPartition(self.TOPIC, 0)
 
         consumer = self.setup_consumer(self.TOPIC, 
enable_autocommit=enable_autocommit, group_protocol=group_protocol)
@@ -505,7 +505,7 @@ class OffsetValidationTest(VerifiableConsumerTest):
         use_new_coordinator=[True],
         group_protocol=consumer_group.all_group_protocols
     )
-    def test_group_consumption(self, metadata_quorum=quorum.zk, 
use_new_coordinator=False, group_protocol=None):
+    def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft, 
use_new_coordinator=False, group_protocol=None):
         """
         Verifies correct group rebalance behavior as consumers are started and 
stopped.
         In particular, this test verifies that the partition is readable after 
every
@@ -570,7 +570,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
                              
"org.apache.kafka.clients.consumer.CooperativeStickyAssignor"],
         metadata_quorum=[quorum.isolated_kraft],
         use_new_coordinator=[True],
-        group_protocol=[consumer_group.classic_group_protocol],
+        group_protocol=[consumer_group.classic_group_protocol]
     )
     @matrix(
         metadata_quorum=[quorum.isolated_kraft],
@@ -578,7 +578,7 @@ class AssignmentValidationTest(VerifiableConsumerTest):
         group_protocol=[consumer_group.consumer_group_protocol],
         group_remote_assignor=consumer_group.all_remote_assignors
     )
-    def test_valid_assignment(self, assignment_strategy=None, 
metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None, 
group_remote_assignor=None):
+    def test_valid_assignment(self, assignment_strategy=None, 
metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, 
group_protocol=None, group_remote_assignor=None):
         """
         Verify assignment strategy correctness: each partition is assigned to 
exactly
         one consumer instance.
diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java 
b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
index 825e5ed2c6e..0da56f1340b 100644
--- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
+++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
@@ -634,15 +634,15 @@ public class VerifiableConsumer implements Closeable, 
OffsetCommitCallback, Cons
             }
         }
 
-        String groupProtocol = res.getString("groupProtocol");
+        GroupProtocol groupProtocol = 
GroupProtocol.of(res.getString("groupProtocol"));
+        consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol.name());
 
         // 3.7.0 includes support for KIP-848 which introduced a new 
implementation of the consumer group protocol.
         // The two implementations use slightly different configuration, hence 
these arguments are conditional.
         //
         // See the Python class/method VerifiableConsumer.start_cmd() in 
verifiable_consumer.py for how the
         // command line arguments are passed in by the system test framework.
-        if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name())) {
-            consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, 
groupProtocol);
+        if (groupProtocol == GroupProtocol.CONSUMER) {
             String groupRemoteAssignor = res.getString("groupRemoteAssignor");
 
             if (groupRemoteAssignor != null)

Reply via email to