This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ff4d9510277 KAFKA-17715 Remove argument force_use_zk_connection from kafka_acls_cmd_with_optional_security_settings (#19209) ff4d9510277 is described below commit ff4d9510277c474a4be17f95fade9c0c479ec5ab Author: Ming-Yen Chung <mingyen...@gmail.com> AuthorDate: Wed Jul 9 17:07:56 2025 +0800 KAFKA-17715 Remove argument force_use_zk_connection from kafka_acls_cmd_with_optional_security_settings (#19209) The e2e tests currently cover version 2.1.0 and above. Thus, we can remove `force_use_zk_connection` in `kafka_acls_cmd_with_optional_security_settings` In contrast, the `force_use_zk_connection` in `kafka_topics_cmd_with_optional_security_settings` and `kafka_configs_cmd_with_optional_security_settings` still needs to be kept as `kafka-topics.sh` does not support `--bootstrap-server` in 2.1 and 2.2 e2e test result: ``` =========================================== SESSION REPORT (ALL TESTS) ducktape version: 0.12.0 session_id: 2025-07-02--001 run time: 200 minutes 28.399 seconds tests run: 90 passed: 90 flaky: 0 failed: 0 ignored: 0 =========================================== ``` Reviewers: Ken Huang <s7133...@gmail.com>, TengYao Chi <kiting...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- tests/kafkatest/services/kafka/kafka.py | 37 +++++++++---------------- tests/kafkatest/services/security/kafka_acls.py | 8 ++---- tests/kafkatest/tests/client/quota_test.py | 2 +- tests/kafkatest/tests/core/authorizer_test.py | 5 ++-- tests/kafkatest/version.py | 3 -- 5 files changed, 20 insertions(+), 35 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index ff106425005..c80197c3a5a 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -1188,12 +1188,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): return False return True - def all_nodes_acl_command_supports_bootstrap_server(self): - for node in self.nodes: - if not node.version.acl_command_supports_bootstrap_server(): - return False - return True - def all_nodes_reassign_partitions_command_supports_bootstrap_server(self): for node in self.nodes: if not node.version.reassign_partitions_command_supports_bootstrap_server(): @@ -1350,30 +1344,25 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.logger.info("Running alter message format command...\n%s" % cmd) node.account.ssh(cmd) - def kafka_acls_cmd_with_optional_security_settings(self, node, force_use_zk_connection, kafka_security_protocol = None, override_command_config = None): + def kafka_acls_cmd_with_optional_security_settings(self, node, kafka_security_protocol = None, override_command_config = None): if self.quorum_info.using_kraft and not self.quorum_info.has_brokers: raise Exception("Must invoke kafka-acls against a broker, not a KRaft controller") - force_use_zk_connection = force_use_zk_connection or not self.all_nodes_acl_command_supports_bootstrap_server - if force_use_zk_connection: - bootstrap_server_or_authorizer_zk_props = "--authorizer-properties zookeeper.connect=%s" % (self.zk_connect_setting()) - skip_optional_security_settings = True - else: - if kafka_security_protocol is None: - # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, - # otherwise use the client security protocol - if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: - security_protocol_to_use = SecurityConfig.PLAINTEXT - else: - security_protocol_to_use = self.security_protocol + if kafka_security_protocol is None: + # it wasn't specified, so use the inter-broker security protocol if it is PLAINTEXT, + # otherwise use the client security protocol + if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT: + security_protocol_to_use = SecurityConfig.PLAINTEXT else: - security_protocol_to_use = kafka_security_protocol - bootstrap_server_or_authorizer_zk_props = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) - skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT + security_protocol_to_use = self.security_protocol + else: + security_protocol_to_use = kafka_security_protocol + bootstrap_server = "--bootstrap-server %s" % (self.bootstrap_servers(security_protocol_to_use)) + skip_optional_security_settings = security_protocol_to_use == SecurityConfig.PLAINTEXT if skip_optional_security_settings: optional_jass_krb_system_props_prefix = "" optional_command_config_suffix = "" else: - # we need security configs because aren't going to ZooKeeper and we aren't using PLAINTEXT + # we need security configs because we aren't using PLAINTEXT if (security_protocol_to_use == self.interbroker_security_protocol): # configure JAAS to provide the broker's credentials # since this is an authenticating cluster and we are going to use the inter-broker security protocol @@ -1393,7 +1382,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): kafka_acls_script = self.path.script("kafka-acls.sh", node) return "%s%s %s%s" % \ (optional_jass_krb_system_props_prefix, kafka_acls_script, - bootstrap_server_or_authorizer_zk_props, optional_command_config_suffix) + bootstrap_server, optional_command_config_suffix) def run_cli_tool(self, node, cmd): output = "" diff --git a/tests/kafkatest/services/security/kafka_acls.py b/tests/kafkatest/services/security/kafka_acls.py index 5bd1a46f597..8c6f946b109 100644 --- a/tests/kafkatest/services/security/kafka_acls.py +++ b/tests/kafkatest/services/security/kafka_acls.py @@ -19,13 +19,11 @@ class ACLs: def __init__(self, context): self.context = context - def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, additional_cluster_operations_to_grant = [], security_protocol=None): + def add_cluster_acl(self, kafka, principal, additional_cluster_operations_to_grant = [], security_protocol=None): """ :param kafka: Kafka cluster upon which ClusterAction ACL is created :param principal: principal for which ClusterAction ACL is created :param node: Node to use when determining connection settings - :param force_use_zk_connection: forces the use of ZooKeeper when true, otherwise AdminClient is used when available. - This is necessary for the case where we are bootstrapping ACLs before Kafka is started or before authorizer is enabled :param additional_cluster_operations_to_grant may be set to ['Alter', 'Create'] if the cluster is secured since these are required to create SCRAM credentials and topics, respectively :param security_protocol set it to explicitly determine whether we use client or broker credentials, otherwise @@ -37,7 +35,7 @@ class ACLs: for operation in ['ClusterAction'] + additional_cluster_operations_to_grant: cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s --allow-principal=%(principal)s" % { - 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, force_use_zk_connection, security_protocol), + 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol), 'operation': operation, 'principal': principal } @@ -59,7 +57,7 @@ class ACLs: for operation in ['ClusterAction'] + additional_cluster_operations_to_remove: cmd = "%(cmd_prefix)s --remove --force --cluster --operation=%(operation)s --allow-principal=%(principal)s" % { - 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, False, security_protocol), + 'cmd_prefix': kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol), 'operation': operation, 'principal': principal } diff --git a/tests/kafkatest/tests/client/quota_test.py b/tests/kafkatest/tests/client/quota_test.py index e89fea80eee..36d4eca08fd 100644 --- a/tests/kafkatest/tests/client/quota_test.py +++ b/tests/kafkatest/tests/client/quota_test.py @@ -78,7 +78,7 @@ class QuotaConfig(object): def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, entity_args): node = kafka.nodes[0] cmd = "%s --alter --add-config producer_byte_rate=%d,consumer_byte_rate=%d" % \ - (kafka.kafka_configs_cmd_with_optional_security_settings(node, False), producer_byte_rate, consumer_byte_rate) + (kafka.kafka_configs_cmd_with_optional_security_settings(node, force_use_zk_connection=False), producer_byte_rate, consumer_byte_rate) cmd += " --entity-type " + entity_args[0] + self.entity_name_opt(entity_args[1]) if len(entity_args) > 2: cmd += " --entity-type " + entity_args[2] + self.entity_name_opt(entity_args[3]) diff --git a/tests/kafkatest/tests/core/authorizer_test.py b/tests/kafkatest/tests/core/authorizer_test.py index 60c0612f356..260b16b1f6f 100644 --- a/tests/kafkatest/tests/core/authorizer_test.py +++ b/tests/kafkatest/tests/core/authorizer_test.py @@ -98,8 +98,9 @@ class AuthorizerTest(Test): # add ACLs self.logger.info("Adding ACLs with broker credentials so that alter client quotas command will succeed") - self.acls.add_cluster_acl(self.kafka, client_principal, force_use_zk_connection=False, - additional_cluster_operations_to_grant=['AlterConfigs'], security_protocol=broker_security_protocol) + self.acls.add_cluster_acl(self.kafka, client_principal, + additional_cluster_operations_to_grant=['AlterConfigs'], + security_protocol=broker_security_protocol) # the alter client quotas command should now succeed again self.logger.info(alter_client_quotas_cmd_log_msg) diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index cc442bc4bbd..16f3169d500 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -62,9 +62,6 @@ class KafkaVersion(LooseVersion): return LooseVersion._cmp(self, other) - def acl_command_supports_bootstrap_server(self): - return self >= V_2_1_0 - def topic_command_supports_bootstrap_server(self): return self >= V_2_3_0