This is an automated email from the ASF dual-hosted git repository. lianetm pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 45e3c21e9a8 KAFKA-17915: Convert Kafka Client system tests to use KRaft (#17669) 45e3c21e9a8 is described below commit 45e3c21e9a8c579190f22712705af26a65cbd3e3 Author: Kirk True <k...@kirktrue.pro> AuthorDate: Tue Jan 14 05:47:15 2025 -0800 KAFKA-17915: Convert Kafka Client system tests to use KRaft (#17669) Reviewers: Lianet Magrans <lmagr...@confluent.io>, Chia-Ping Tsai <chia7...@gmail.com> --- tests/kafkatest/tests/client/consumer_test.py | 20 ++++++++++---------- .../org/apache/kafka/tools/VerifiableConsumer.java | 6 +++--- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/kafkatest/tests/client/consumer_test.py b/tests/kafkatest/tests/client/consumer_test.py index bc35cab220f..5e5c5ff308d 100644 --- a/tests/kafkatest/tests/client/consumer_test.py +++ b/tests/kafkatest/tests/client/consumer_test.py @@ -84,7 +84,7 @@ class OffsetValidationTest(VerifiableConsumerTest): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_broker_rolling_bounce(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_broker_rolling_bounce(self, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verify correct consumer behavior when the brokers are consecutively restarted. @@ -143,7 +143,7 @@ class OffsetValidationTest(VerifiableConsumerTest): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_consumer_bounce(self, clean_shutdown, bounce_mode, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verify correct consumer behavior when the consumers in the group are consecutively restarted. @@ -202,7 +202,7 @@ class OffsetValidationTest(VerifiableConsumerTest): use_new_coordinator=[True], group_protocol=[consumer_group.classic_group_protocol] ) - def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_static_consumer_bounce_with_eager_assignment(self, clean_shutdown, static_membership, bounce_mode, num_bounces, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verify correct static consumer behavior when the consumers in the group are restarted. In order to make sure the behavior of static members are different from dynamic ones, we take both static and dynamic @@ -275,7 +275,7 @@ class OffsetValidationTest(VerifiableConsumerTest): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_static_consumer_persisted_after_rejoin(self, bounce_mode, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verify that the updated member.id(updated_member_id) caused by static member rejoin would be persisted. If not, after the brokers rolling bounce, the migrated group coordinator would load the stale persisted member.id and @@ -317,7 +317,7 @@ class OffsetValidationTest(VerifiableConsumerTest): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_fencing_static_consumer(self, num_conflict_consumers, fencing_stage, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verify correct static consumer behavior when there are conflicting consumers with same group.instance.id. @@ -401,7 +401,7 @@ class OffsetValidationTest(VerifiableConsumerTest): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_consumer_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): partition = TopicPartition(self.TOPIC, 0) consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol) @@ -459,7 +459,7 @@ class OffsetValidationTest(VerifiableConsumerTest): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_broker_failure(self, clean_shutdown, enable_autocommit, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): partition = TopicPartition(self.TOPIC, 0) consumer = self.setup_consumer(self.TOPIC, enable_autocommit=enable_autocommit, group_protocol=group_protocol) @@ -505,7 +505,7 @@ class OffsetValidationTest(VerifiableConsumerTest): use_new_coordinator=[True], group_protocol=consumer_group.all_group_protocols ) - def test_group_consumption(self, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None): + def test_group_consumption(self, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None): """ Verifies correct group rebalance behavior as consumers are started and stopped. In particular, this test verifies that the partition is readable after every @@ -570,7 +570,7 @@ class AssignmentValidationTest(VerifiableConsumerTest): "org.apache.kafka.clients.consumer.CooperativeStickyAssignor"], metadata_quorum=[quorum.isolated_kraft], use_new_coordinator=[True], - group_protocol=[consumer_group.classic_group_protocol], + group_protocol=[consumer_group.classic_group_protocol] ) @matrix( metadata_quorum=[quorum.isolated_kraft], @@ -578,7 +578,7 @@ class AssignmentValidationTest(VerifiableConsumerTest): group_protocol=[consumer_group.consumer_group_protocol], group_remote_assignor=consumer_group.all_remote_assignors ) - def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.zk, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None): + def test_valid_assignment(self, assignment_strategy=None, metadata_quorum=quorum.isolated_kraft, use_new_coordinator=False, group_protocol=None, group_remote_assignor=None): """ Verify assignment strategy correctness: each partition is assigned to exactly one consumer instance. diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 825e5ed2c6e..0da56f1340b 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -634,15 +634,15 @@ public class VerifiableConsumer implements Closeable, OffsetCommitCallback, Cons } } - String groupProtocol = res.getString("groupProtocol"); + GroupProtocol groupProtocol = GroupProtocol.of(res.getString("groupProtocol")); + consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol.name()); // 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol. // The two implementations use slightly different configuration, hence these arguments are conditional. // // See the Python class/method VerifiableConsumer.start_cmd() in verifiable_consumer.py for how the // command line arguments are passed in by the system test framework. - if (groupProtocol.equalsIgnoreCase(GroupProtocol.CONSUMER.name())) { - consumerProps.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); + if (groupProtocol == GroupProtocol.CONSUMER) { String groupRemoteAssignor = res.getString("groupRemoteAssignor"); if (groupRemoteAssignor != null)