This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ff4d9510277 KAFKA-17715 Remove argument force_use_zk_connection from 
kafka_acls_cmd_with_optional_security_settings (#19209)
ff4d9510277 is described below

commit ff4d9510277c474a4be17f95fade9c0c479ec5ab
Author: Ming-Yen Chung <mingyen...@gmail.com>
AuthorDate: Wed Jul 9 17:07:56 2025 +0800

    KAFKA-17715 Remove argument force_use_zk_connection from 
kafka_acls_cmd_with_optional_security_settings (#19209)
    
    The e2e tests currently cover version 2.1.0 and above. Thus, we can
    remove `force_use_zk_connection` in
    `kafka_acls_cmd_with_optional_security_settings`
    
    In contrast, the `force_use_zk_connection` in
    `kafka_topics_cmd_with_optional_security_settings` and
    `kafka_configs_cmd_with_optional_security_settings` still needs to be
    kept as `kafka-topics.sh` does not support `--bootstrap-server` in 2.1
    and 2.2
    
    e2e test result:
    ```
    ===========================================
    SESSION REPORT (ALL TESTS)
    ducktape version: 0.12.0
    session_id:       2025-07-02--001
    run time:         200 minutes 28.399 seconds
    tests run:        90
    passed:           90
    flaky:            0
    failed:           0
    ignored:          0
    ===========================================
    ```
    
    Reviewers: Ken Huang <s7133...@gmail.com>, TengYao Chi
    <kiting...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com>
---
 tests/kafkatest/services/kafka/kafka.py         | 37 +++++++++----------------
 tests/kafkatest/services/security/kafka_acls.py |  8 ++----
 tests/kafkatest/tests/client/quota_test.py      |  2 +-
 tests/kafkatest/tests/core/authorizer_test.py   |  5 ++--
 tests/kafkatest/version.py                      |  3 --
 5 files changed, 20 insertions(+), 35 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index ff106425005..c80197c3a5a 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -1188,12 +1188,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                 return False
         return True
 
-    def all_nodes_acl_command_supports_bootstrap_server(self):
-        for node in self.nodes:
-            if not node.version.acl_command_supports_bootstrap_server():
-                return False
-        return True
-
     def all_nodes_reassign_partitions_command_supports_bootstrap_server(self):
         for node in self.nodes:
             if not 
node.version.reassign_partitions_command_supports_bootstrap_server():
@@ -1350,30 +1344,25 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.logger.info("Running alter message format command...\n%s" % cmd)
         node.account.ssh(cmd)
 
-    def kafka_acls_cmd_with_optional_security_settings(self, node, 
force_use_zk_connection, kafka_security_protocol = None, 
override_command_config = None):
+    def kafka_acls_cmd_with_optional_security_settings(self, node, 
kafka_security_protocol = None, override_command_config = None):
         if self.quorum_info.using_kraft and not self.quorum_info.has_brokers:
             raise Exception("Must invoke kafka-acls against a broker, not a 
KRaft controller")
-        force_use_zk_connection = force_use_zk_connection or not 
self.all_nodes_acl_command_supports_bootstrap_server
-        if force_use_zk_connection:
-            bootstrap_server_or_authorizer_zk_props = "--authorizer-properties 
zookeeper.connect=%s" % (self.zk_connect_setting())
-            skip_optional_security_settings = True
-        else:
-            if kafka_security_protocol is None:
-                # it wasn't specified, so use the inter-broker security 
protocol if it is PLAINTEXT,
-                # otherwise use the client security protocol
-                if self.interbroker_security_protocol == 
SecurityConfig.PLAINTEXT:
-                    security_protocol_to_use = SecurityConfig.PLAINTEXT
-                else:
-                    security_protocol_to_use = self.security_protocol
+        if kafka_security_protocol is None:
+            # it wasn't specified, so use the inter-broker security protocol 
if it is PLAINTEXT,
+            # otherwise use the client security protocol
+            if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+                security_protocol_to_use = SecurityConfig.PLAINTEXT
             else:
-                security_protocol_to_use = kafka_security_protocol
-            bootstrap_server_or_authorizer_zk_props = "--bootstrap-server %s" 
% (self.bootstrap_servers(security_protocol_to_use))
-            skip_optional_security_settings = security_protocol_to_use == 
SecurityConfig.PLAINTEXT
+                security_protocol_to_use = self.security_protocol
+        else:
+            security_protocol_to_use = kafka_security_protocol
+        bootstrap_server = "--bootstrap-server %s" % 
(self.bootstrap_servers(security_protocol_to_use))
+        skip_optional_security_settings = security_protocol_to_use == 
SecurityConfig.PLAINTEXT
         if skip_optional_security_settings:
             optional_jass_krb_system_props_prefix = ""
             optional_command_config_suffix = ""
         else:
-            # we need security configs because aren't going to ZooKeeper and 
we aren't using PLAINTEXT
+            # we need security configs because we aren't using PLAINTEXT
             if (security_protocol_to_use == 
self.interbroker_security_protocol):
                 # configure JAAS to provide the broker's credentials
                 # since this is an authenticating cluster and we are going to 
use the inter-broker security protocol
@@ -1393,7 +1382,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         kafka_acls_script = self.path.script("kafka-acls.sh", node)
         return "%s%s %s%s" % \
                (optional_jass_krb_system_props_prefix, kafka_acls_script,
-                bootstrap_server_or_authorizer_zk_props, 
optional_command_config_suffix)
+                bootstrap_server, optional_command_config_suffix)
 
     def run_cli_tool(self, node, cmd):
         output = ""
diff --git a/tests/kafkatest/services/security/kafka_acls.py 
b/tests/kafkatest/services/security/kafka_acls.py
index 5bd1a46f597..8c6f946b109 100644
--- a/tests/kafkatest/services/security/kafka_acls.py
+++ b/tests/kafkatest/services/security/kafka_acls.py
@@ -19,13 +19,11 @@ class ACLs:
     def __init__(self, context):
         self.context = context
 
-    def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, 
additional_cluster_operations_to_grant = [], security_protocol=None):
+    def add_cluster_acl(self, kafka, principal, 
additional_cluster_operations_to_grant = [], security_protocol=None):
         """
         :param kafka: Kafka cluster upon which ClusterAction ACL is created
         :param principal: principal for which ClusterAction ACL is created
         :param node: Node to use when determining connection settings
-        :param force_use_zk_connection: forces the use of ZooKeeper when true, 
otherwise AdminClient is used when available.
-               This is necessary for the case where we are bootstrapping ACLs 
before Kafka is started or before authorizer is enabled
         :param additional_cluster_operations_to_grant may be set to ['Alter', 
'Create'] if the cluster is secured since these are required
                to create SCRAM credentials and topics, respectively
         :param security_protocol set it to explicitly determine whether we use 
client or broker credentials, otherwise
@@ -37,7 +35,7 @@ class ACLs:
 
         for operation in ['ClusterAction'] + 
additional_cluster_operations_to_grant:
             cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s 
--allow-principal=%(principal)s" % {
-                'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(node, 
force_use_zk_connection, security_protocol),
+                'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol),
                 'operation': operation,
                 'principal': principal
             }
@@ -59,7 +57,7 @@ class ACLs:
 
         for operation in ['ClusterAction'] + 
additional_cluster_operations_to_remove:
             cmd = "%(cmd_prefix)s --remove --force --cluster 
--operation=%(operation)s --allow-principal=%(principal)s" % {
-                'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(node, False, 
security_protocol),
+                'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(node, security_protocol),
                 'operation': operation,
                 'principal': principal
             }
diff --git a/tests/kafkatest/tests/client/quota_test.py 
b/tests/kafkatest/tests/client/quota_test.py
index e89fea80eee..36d4eca08fd 100644
--- a/tests/kafkatest/tests/client/quota_test.py
+++ b/tests/kafkatest/tests/client/quota_test.py
@@ -78,7 +78,7 @@ class QuotaConfig(object):
     def configure_quota(self, kafka, producer_byte_rate, consumer_byte_rate, 
entity_args):
         node = kafka.nodes[0]
         cmd = "%s --alter --add-config 
producer_byte_rate=%d,consumer_byte_rate=%d" % \
-              (kafka.kafka_configs_cmd_with_optional_security_settings(node, 
False), producer_byte_rate, consumer_byte_rate)
+              (kafka.kafka_configs_cmd_with_optional_security_settings(node, 
force_use_zk_connection=False), producer_byte_rate, consumer_byte_rate)
         cmd += " --entity-type " + entity_args[0] + 
self.entity_name_opt(entity_args[1])
         if len(entity_args) > 2:
             cmd += " --entity-type " + entity_args[2] + 
self.entity_name_opt(entity_args[3])
diff --git a/tests/kafkatest/tests/core/authorizer_test.py 
b/tests/kafkatest/tests/core/authorizer_test.py
index 60c0612f356..260b16b1f6f 100644
--- a/tests/kafkatest/tests/core/authorizer_test.py
+++ b/tests/kafkatest/tests/core/authorizer_test.py
@@ -98,8 +98,9 @@ class AuthorizerTest(Test):
 
         # add ACLs
         self.logger.info("Adding ACLs with broker credentials so that alter 
client quotas command will succeed")
-        self.acls.add_cluster_acl(self.kafka, client_principal, 
force_use_zk_connection=False,
-                                  
additional_cluster_operations_to_grant=['AlterConfigs'], 
security_protocol=broker_security_protocol)
+        self.acls.add_cluster_acl(self.kafka, client_principal,
+                                  
additional_cluster_operations_to_grant=['AlterConfigs'],
+                                  security_protocol=broker_security_protocol)
 
         # the alter client quotas command should now succeed again
         self.logger.info(alter_client_quotas_cmd_log_msg)
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index cc442bc4bbd..16f3169d500 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -62,9 +62,6 @@ class KafkaVersion(LooseVersion):
 
         return LooseVersion._cmp(self, other)
 
-    def acl_command_supports_bootstrap_server(self):
-        return self >= V_2_1_0
-
     def topic_command_supports_bootstrap_server(self):
         return self >= V_2_3_0
 

Reply via email to