This is an automated email from the ASF dual-hosted git repository.
lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25fdcd05fc4 KAFKA-17915: Convert remaining Kafka Client system tests
to use KRaft (#18367)
25fdcd05fc4 is described below
commit 25fdcd05fc488e49bd1f077d1c464e686d9884e2
Author: kevin-wu24 <[email protected]>
AuthorDate: Thu Jan 9 12:20:34 2025 -0600
KAFKA-17915: Convert remaining Kafka Client system tests to use KRaft
(#18367)
Reviewers: Lianet Magrans <[email protected]>
---
.../client/client_compatibility_features_test.py | 18 +++++++------
.../client_compatibility_produce_consume_test.py | 31 +++++++++++++++-------
tests/kafkatest/tests/client/quota_test.py | 18 +++++--------
3 files changed, 37 insertions(+), 30 deletions(-)
diff --git a/tests/kafkatest/tests/client/client_compatibility_features_test.py
b/tests/kafkatest/tests/client/client_compatibility_features_test.py
index 1e0c37825d1..dcb7146a298 100644
--- a/tests/kafkatest/tests/client/client_compatibility_features_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_features_test.py
@@ -53,7 +53,6 @@ def run_command(node, cmd, ssh_log_file):
print(e, flush=True)
raise
-
class ClientCompatibilityFeaturesTest(Test):
"""
Tests clients for the presence or absence of specific features when
communicating with brokers with various
@@ -118,17 +117,20 @@ class ClientCompatibilityFeaturesTest(Test):
@parametrize(broker_version=str(LATEST_3_0))
@parametrize(broker_version=str(LATEST_3_1))
@parametrize(broker_version=str(LATEST_3_2))
- @parametrize(broker_version=str(LATEST_3_3))
- @parametrize(broker_version=str(LATEST_3_4))
- @parametrize(broker_version=str(LATEST_3_5))
- @parametrize(broker_version=str(LATEST_3_6))
- @parametrize(broker_version=str(LATEST_3_7))
- @parametrize(broker_version=str(LATEST_3_8))
- @parametrize(broker_version=str(LATEST_3_9))
+ @parametrize(broker_version=str(LATEST_3_3),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_4),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_5),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_6),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_7),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_8),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_9),
metadata_quorum=quorum.isolated_kraft)
def run_compatibility_test(self, broker_version,
metadata_quorum=quorum.zk):
if self.zk:
self.zk.start()
self.kafka.set_version(KafkaVersion(broker_version))
+ if metadata_quorum == quorum.isolated_kraft:
+ for node in self.kafka.controller_quorum.nodes:
+ node.version = KafkaVersion(broker_version)
self.kafka.start()
features = get_broker_features(broker_version)
self.invoke_compatibility_program(features)
diff --git
a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
index 1209deed64f..4c7bfef158f 100644
--- a/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
+++ b/tests/kafkatest/tests/client/client_compatibility_produce_consume_test.py
@@ -38,9 +38,17 @@ class
ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
self.topic = "test_topic"
self.zk = ZookeeperService(test_context, num_nodes=3) if
quorum.for_test(test_context) == quorum.zk else None
- self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk,
topics={self.topic:{
-
"partitions": 10,
-
"replication-factor": 2}})
+ self.kafka = KafkaService(
+ test_context,
+ num_nodes=3,
+ zk=self.zk,
+ topics={
+ self.topic:{
+ "partitions": 10,
+ "replication-factor": 2
+ }
+ },
+ )
self.num_partitions = 10
self.timeout_sec = 60
self.producer_throughput = 1000
@@ -69,16 +77,19 @@ class
ClientCompatibilityProduceConsumeTest(ProduceConsumeValidateTest):
@parametrize(broker_version=str(LATEST_3_0))
@parametrize(broker_version=str(LATEST_3_1))
@parametrize(broker_version=str(LATEST_3_2))
- @parametrize(broker_version=str(LATEST_3_3))
- @parametrize(broker_version=str(LATEST_3_4))
- @parametrize(broker_version=str(LATEST_3_5))
- @parametrize(broker_version=str(LATEST_3_6))
- @parametrize(broker_version=str(LATEST_3_7))
- @parametrize(broker_version=str(LATEST_3_8))
- @parametrize(broker_version=str(LATEST_3_9))
+ @parametrize(broker_version=str(LATEST_3_3),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_4),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_5),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_6),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_7),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_8),
metadata_quorum=quorum.isolated_kraft)
+ @parametrize(broker_version=str(LATEST_3_9),
metadata_quorum=quorum.isolated_kraft)
def test_produce_consume(self, broker_version, metadata_quorum=quorum.zk):
print("running producer_consumer_compat with broker_version = %s" %
broker_version, flush=True)
self.kafka.set_version(KafkaVersion(broker_version))
+ if metadata_quorum == quorum.isolated_kraft:
+ for node in self.kafka.controller_quorum.nodes:
+ node.version = KafkaVersion(broker_version)
self.kafka.security_protocol = "PLAINTEXT"
self.kafka.interbroker_security_protocol = self.kafka.security_protocol
self.producer = VerifiableProducer(self.test_context,
self.num_producers, self.kafka,
diff --git a/tests/kafkatest/tests/client/quota_test.py
b/tests/kafkatest/tests/client/quota_test.py
index d52f9b6a944..e89fea80eee 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -17,8 +17,7 @@ from ducktape.tests.test import Test
from ducktape.mark import matrix, parametrize
from ducktape.mark.resource import cluster
-from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.services.kafka import KafkaService
+from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.performance import ProducerPerformanceService
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.version import DEV_BRANCH
@@ -77,10 +76,9 @@ class QuotaConfig(object):
self.configure_quota(kafka, QuotaConfig.LARGE_QUOTA,
QuotaConfig.LARGE_QUOTA, ['clients', None])
def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate,
entity_args):
- force_use_zk_connection = not
kafka.all_nodes_configs_command_uses_bootstrap_server()
node = kafka.nodes[0]
cmd = "%s --alter --add-config
producer_byte_rate=%d,consumer_byte_rate=%d" % \
- (kafka.kafka_configs_cmd_with_optional_security_settings(node,
force_use_zk_connection), producer_byte_rate, consumer_byte_rate)
+ (kafka.kafka_configs_cmd_with_optional_security_settings(node,
False), producer_byte_rate, consumer_byte_rate)
cmd += " --entity-type " + entity_args[0] +
self.entity_name_opt(entity_args[1])
if len(entity_args) > 2:
cmd += " --entity-type " + entity_args[2] +
self.entity_name_opt(entity_args[3])
@@ -108,8 +106,7 @@ class QuotaTest(Test):
self.num_records = 50000
self.record_size = 3000
- self.zk = ZookeeperService(test_context, num_nodes=1)
- self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
+ self.kafka = KafkaService(test_context, num_nodes=1, zk=None,
security_protocol='SSL',
authorizer_class_name='',
interbroker_security_protocol='SSL',
topics={self.topic: {'partitions': 6,
'replication-factor': 1, 'configs': {'min.insync.replicas': 1}}},
@@ -119,17 +116,14 @@ class QuotaTest(Test):
self.num_producers = 1
self.num_consumers = 2
- def setUp(self):
- self.zk.start()
-
def min_cluster_size(self):
"""Override this since we're adding services outside of the
constructor"""
return super(QuotaTest, self).min_cluster_size() + self.num_producers
+ self.num_consumers
@cluster(num_nodes=5)
- @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER,
QuotaConfig.USER_CLIENT], override_quota=[True, False])
- @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2)
- def test_quota(self, quota_type, override_quota=True, producer_num=1,
consumer_num=1):
+ @matrix(quota_type=[QuotaConfig.CLIENT_ID, QuotaConfig.USER,
QuotaConfig.USER_CLIENT], override_quota=[True, False],
metadata_quorum=[quorum.isolated_kraft])
+ @parametrize(quota_type=QuotaConfig.CLIENT_ID, consumer_num=2,
metadata_quorum=quorum.isolated_kraft)
+ def test_quota(self, quota_type, override_quota=True, producer_num=1,
consumer_num=1, metadata_quorum=quorum.isolated_kraft):
self.kafka.start()
self.quota_config = QuotaConfig(quota_type, override_quota, self.kafka)