This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 48ed6de7150ef58e95be03ddd906aadd07a83a3c
Author: Ron Dagostino <rdagost...@confluent.io>
AuthorDate: Fri Oct 9 10:34:53 2020 -0400

    MINOR: ACLs for secured cluster system tests (#9378)
    
    This PR adds missing broker ACLs required to create topics and SCRAM 
credentials when ACLs are enabled for a system test. This PR also adds support 
for using PLAINTEXT as the inter broker security protocol when using SCRAM from 
the client in a system test with a secured cluster-- without this it would 
always be necessary to set both the inter-broker and client mechanisms to a 
SCRAM mechanism. Also contains some refactoring to make assumptions clearer.
    
    Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com>
---
 tests/kafkatest/services/kafka/kafka.py            | 232 ++++++++++++++-------
 tests/kafkatest/services/security/kafka_acls.py    |  82 +++-----
 .../kafkatest/services/security/security_config.py |  37 +---
 .../templates/admin_client_as_broker_jaas.conf     |  24 +++
 .../tests/core/zookeeper_security_upgrade_test.py  |   3 +-
 5 files changed, 223 insertions(+), 155 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index be56b85..22f0f74 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -71,6 +71,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
     INTERBROKER_LISTENER_NAME = 'INTERNAL'
     JAAS_CONF_PROPERTY = 
"java.security.auth.login.config=/mnt/security/jaas.conf"
+    ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY = 
"java.security.auth.login.config=/mnt/security/admin_client_as_broker_jaas.conf"
     KRB5_CONF = "java.security.krb5.conf=/mnt/security/krb5.conf"
 
     logs = {
@@ -372,7 +373,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         node.account.mkdirs(KafkaService.PERSISTENT_ROOT)
 
         self.security_config.setup_node(node)
-        self.security_config.maybe_setup_broker_scram_credentials(node, 
self.path, "--zookeeper %s %s" % (self.zk_connect_setting(), 
self.zk.zkTlsConfigFileOption()))
+        self.maybe_setup_broker_scram_credentials(node)
 
         prop_file = self.prop_file(node)
         self.logger.info("kafka.properties:")
@@ -393,7 +394,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         # existing credentials from ZK and dynamic update of credentials in 
Kafka are tested.
         # We use the admin client and connect as the broker user when creating 
the client (non-broker) credentials
         # if Kafka supports KIP-554, otherwise we use ZooKeeper.
-        self.security_config.maybe_setup_client_scram_credentials(node, 
self.path, self._connect_setting_kafka_configs_scram(node))
+        self.maybe_setup_client_scram_credentials(node)
 
         self.start_jmx_tool(self.idx(node), node)
         if len(self.pids(node)) == 0:
@@ -445,23 +446,113 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                                          clean_shutdown=False, allow_fail=True)
         node.account.ssh("sudo rm -rf -- %s" % KafkaService.PERSISTENT_ROOT, 
allow_fail=False)
 
-    def _kafka_topics_cmd(self, node, force_use_zk_connection):
-        """
-        Returns kafka-topics.sh command path with jaas configuration and krb5 
environment variable
-        set. If Admin client is not going to be used, don't set the 
environment variable.
-        """
+    def kafka_topics_cmd_with_optional_security_settings(self, node, 
force_use_zk_connection, kafka_security_protocol = None):
+        if force_use_zk_connection:
+            bootstrap_server_or_zookeeper = "--zookeeper %s" % 
(self.zk_connect_setting())
+            skip_optional_security_settings = True
+        else:
+            if kafka_security_protocol is None:
+                # it wasn't specified, so use the inter-broker security 
protocol if it is PLAINTEXT,
+                # otherwise use the client security protocol
+                if self.interbroker_security_protocol == 
SecurityConfig.PLAINTEXT:
+                    security_protocol_to_use = SecurityConfig.PLAINTEXT
+                else:
+                    security_protocol_to_use = self.security_protocol
+            else:
+                security_protocol_to_use = kafka_security_protocol
+            bootstrap_server_or_zookeeper = "--bootstrap-server %s" % 
(self.bootstrap_servers(security_protocol_to_use))
+            skip_optional_security_settings = security_protocol_to_use == 
SecurityConfig.PLAINTEXT
+        if skip_optional_security_settings:
+            optional_jass_krb_system_props_prefix = ""
+            optional_command_config_suffix = ""
+        else:
+            # we need security configs because aren't going to ZooKeeper and 
we aren't using PLAINTEXT
+            if (security_protocol_to_use == 
self.interbroker_security_protocol):
+                # configure JAAS to provide the broker's credentials
+                # since this is an authenticating cluster and we are going to 
use the inter-broker security protocol
+                jaas_conf_prop = 
KafkaService.ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = True
+            else:
+                # configure JAAS to provide the typical client credentials
+                jaas_conf_prop = KafkaService.JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = False
+            optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " 
% (jaas_conf_prop, KafkaService.KRB5_CONF)
+            optional_command_config_suffix = " --command-config <(echo '%s')" 
% (self.security_config.client_config(use_inter_broker_mechanism_for_client = 
use_inter_broker_mechanism_for_client))
         kafka_topic_script = self.path.script("kafka-topics.sh", node)
-        skip_security_settings = force_use_zk_connection or not 
self.all_nodes_topic_command_supports_bootstrap_server()
-        return kafka_topic_script if skip_security_settings else \
-            "KAFKA_OPTS='-D%s -D%s' %s" % (KafkaService.JAAS_CONF_PROPERTY, 
KafkaService.KRB5_CONF, kafka_topic_script)
+        return "%s%s %s%s" % \
+               (optional_jass_krb_system_props_prefix, kafka_topic_script,
+                bootstrap_server_or_zookeeper, optional_command_config_suffix)
+
+    def kafka_configs_cmd_with_optional_security_settings(self, node, 
force_use_zk_connection, kafka_security_protocol = None):
+        if force_use_zk_connection:
+            # kafka-configs supports a TLS config file, so include it if there 
is one
+            bootstrap_server_or_zookeeper = "--zookeeper %s %s" % 
(self.zk_connect_setting(), self.zk.zkTlsConfigFileOption())
+            skip_optional_security_settings = True
+        else:
+            if kafka_security_protocol is None:
+                # it wasn't specified, so use the inter-broker security 
protocol if it is PLAINTEXT,
+                # otherwise use the client security protocol
+                if self.interbroker_security_protocol == 
SecurityConfig.PLAINTEXT:
+                    security_protocol_to_use = SecurityConfig.PLAINTEXT
+                else:
+                    security_protocol_to_use = self.security_protocol
+            else:
+                security_protocol_to_use = kafka_security_protocol
+            bootstrap_server_or_zookeeper = "--bootstrap-server %s" % 
(self.bootstrap_servers(security_protocol_to_use))
+            skip_optional_security_settings = security_protocol_to_use == 
SecurityConfig.PLAINTEXT
+        if skip_optional_security_settings:
+            optional_jass_krb_system_props_prefix = ""
+            optional_command_config_suffix = ""
+        else:
+            # we need security configs because aren't going to ZooKeeper and 
we aren't using PLAINTEXT
+            if (security_protocol_to_use == 
self.interbroker_security_protocol):
+                # configure JAAS to provide the broker's credentials
+                # since this is an authenticating cluster and we are going to 
use the inter-broker security protocol
+                jaas_conf_prop = 
KafkaService.ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = True
+            else:
+                # configure JAAS to provide the typical client credentials
+                jaas_conf_prop = KafkaService.JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = False
+            optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " 
% (jaas_conf_prop, KafkaService.KRB5_CONF)
+            optional_command_config_suffix = " --command-config <(echo '%s')" 
% (self.security_config.client_config(use_inter_broker_mechanism_for_client = 
use_inter_broker_mechanism_for_client))
+        kafka_config_script = self.path.script("kafka-configs.sh", node)
+        return "%s%s %s%s" % \
+               (optional_jass_krb_system_props_prefix, kafka_config_script,
+                bootstrap_server_or_zookeeper, optional_command_config_suffix)
+
+    def maybe_setup_broker_scram_credentials(self, node):
+        security_config = self.security_config
+        # we only need to create broker credentials when the broker mechanism 
is SASL/SCRAM
+        if security_config.is_sasl(self.interbroker_security_protocol) and 
security_config.is_sasl_scram(self.interbroker_sasl_mechanism):
+            force_use_zk_connection = True # we are bootstrapping these 
credentials before Kafka is started
+            cmd = fix_opts_for_new_jvm(node)
+            cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type 
users --alter --add-config %(mechanism)s=[password=%(password)s]" % {
+                'kafka_configs_cmd': 
self.kafka_configs_cmd_with_optional_security_settings(node, 
force_use_zk_connection),
+                'user': SecurityConfig.SCRAM_BROKER_USER,
+                'mechanism': self.interbroker_sasl_mechanism,
+                'password': SecurityConfig.SCRAM_BROKER_PASSWORD
+            }
+            node.account.ssh(cmd)
 
-    def _kafka_topics_cmd_config(self, node, force_use_zk_connection):
-        """
-        Return --command-config parameter to the kafka-topics.sh command. The 
config parameter specifies
-        the security settings that AdminClient uses to connect to a secure 
kafka server.
-        """
-        skip_command_config = force_use_zk_connection or not 
self.all_nodes_topic_command_supports_bootstrap_server()
-        return "" if skip_command_config else " --command-config <(echo '%s')" 
% (self.security_config.client_config())
+    def maybe_setup_client_scram_credentials(self, node):
+        security_config = self.security_config
+        # we only need to create client credentials when the client mechanism 
is SASL/SCRAM
+        if security_config.is_sasl(self.security_protocol) and 
security_config.is_sasl_scram(self.client_sasl_mechanism):
+            force_use_zk_connection = not 
self.all_nodes_configs_command_uses_bootstrap_server_scram()
+            # ignored if forcing the use of Zookeeper, but we need a value to 
send, so calculate it anyway
+            if self.interbroker_security_protocol == SecurityConfig.PLAINTEXT:
+                kafka_security_protocol = self.interbroker_security_protocol
+            else:
+                kafka_security_protocol = self.security_protocol
+            cmd = fix_opts_for_new_jvm(node)
+            cmd += "%(kafka_configs_cmd)s --entity-name %(user)s --entity-type 
users --alter --add-config %(mechanism)s=[password=%(password)s]" % {
+                'kafka_configs_cmd': 
self.kafka_configs_cmd_with_optional_security_settings(node, 
force_use_zk_connection, kafka_security_protocol),
+                'user': SecurityConfig.SCRAM_CLIENT_USER,
+                'mechanism': self.client_sasl_mechanism,
+                'password': SecurityConfig.SCRAM_CLIENT_PASSWORD
+            }
+            node.account.ssh(cmd)
 
     def all_nodes_topic_command_supports_bootstrap_server(self):
         for node in self.nodes:
@@ -515,9 +606,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
                             (topic_cfg.get('if-not-exists', False) and not 
self.all_nodes_topic_command_supports_if_not_exists_with_bootstrap_server())
 
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%(kafka_topics_cmd)s %(connection_string)s --create --topic 
%(topic)s " % {
-            'kafka_topics_cmd': self._kafka_topics_cmd(node, 
force_use_zk_connection),
-            'connection_string': self._topic_command_connect_setting(node, 
force_use_zk_connection),
+        cmd += "%(kafka_topics_cmd)s --create --topic %(topic)s " % {
+            'kafka_topics_cmd': 
self.kafka_topics_cmd_with_optional_security_settings(node, 
force_use_zk_connection),
             'topic': topic_cfg.get("topic"),
         }
         if 'replica-assignment' in topic_cfg:
@@ -537,8 +627,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             for config_name, config_value in topic_cfg["configs"].items():
                 cmd += " --config %s=%s" % (config_name, str(config_value))
 
-        cmd += self._kafka_topics_cmd_config(node, force_use_zk_connection)
-
         self.logger.info("Running topic creation command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -556,10 +644,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         force_use_zk_connection = not 
self.all_nodes_topic_command_supports_bootstrap_server()
 
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s --topic %s --delete %s" % \
-               (self._kafka_topics_cmd(node, force_use_zk_connection),
-                self._topic_command_connect_setting(node, 
force_use_zk_connection),
-                topic, self._kafka_topics_cmd_config(node, 
force_use_zk_connection))
+        cmd += "%s --topic %s --delete" % \
+               (self.kafka_topics_cmd_with_optional_security_settings(node, 
force_use_zk_connection), topic)
         self.logger.info("Running topic delete command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -570,10 +656,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         force_use_zk_connection = not 
self.all_nodes_topic_command_supports_bootstrap_server()
 
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s --topic %s --describe %s" % \
-              (self._kafka_topics_cmd(node, force_use_zk_connection),
-               self._topic_command_connect_setting(node, 
force_use_zk_connection),
-               topic, self._kafka_topics_cmd_config(node, 
force_use_zk_connection))
+        cmd += "%s --topic %s --describe" % \
+               (self.kafka_topics_cmd_with_optional_security_settings(node, 
force_use_zk_connection), topic)
 
         self.logger.info("Running topic describe command...\n%s" % cmd)
         output = ""
@@ -588,9 +672,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         force_use_zk_connection = not 
self.all_nodes_topic_command_supports_bootstrap_server()
 
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s --list %s" % (self._kafka_topics_cmd(node, 
force_use_zk_connection),
-                                   self._topic_command_connect_setting(node, 
force_use_zk_connection),
-                                   self._kafka_topics_cmd_config(node, 
force_use_zk_connection))
+        cmd += "%s --list" % 
(self.kafka_topics_cmd_with_optional_security_settings(node, 
force_use_zk_connection))
         for line in node.account.ssh_capture(cmd):
             if not line.startswith("SLF4J"):
                 yield line.rstrip()
@@ -600,9 +682,11 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
             node = self.nodes[0]
         self.logger.info("Altering message format version for topic %s with 
format %s", topic, msg_format_version)
 
+        force_use_zk_connection = not 
self.all_nodes_configs_command_uses_bootstrap_server()
+
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s --entity-name %s --entity-type topics --alter 
--add-config message.format.version=%s" % \
-              (self.path.script("kafka-configs.sh", node), 
self._connect_setting_kafka_configs(node), topic, msg_format_version)
+        cmd += "%s --entity-name %s --entity-type topics --alter --add-config 
message.format.version=%s" % \
+              (self.kafka_configs_cmd_with_optional_security_settings(node, 
force_use_zk_connection), topic, msg_format_version)
         self.logger.info("Running alter message format command...\n%s" % cmd)
         node.account.ssh(cmd)
 
@@ -614,38 +698,54 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         else:
             self.logger.info("Disabling unclean leader election for topic %s", 
topic)
 
+        force_use_zk_connection = not 
self.all_nodes_configs_command_uses_bootstrap_server()
+
         cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s --entity-name %s --entity-type topics --alter 
--add-config unclean.leader.election.enable=%s" % \
-              (self.path.script("kafka-configs.sh", node), 
self._connect_setting_kafka_configs(node), topic, str(value).lower())
+        cmd += "%s --entity-name %s --entity-type topics --alter --add-config 
unclean.leader.election.enable=%s" % \
+              (self.kafka_configs_cmd_with_optional_security_settings(node, 
force_use_zk_connection), topic, str(value).lower())
         self.logger.info("Running alter unclean leader command...\n%s" % cmd)
         node.account.ssh(cmd)
 
-    def _connect_setting_kafka_configs(self, node):
-        # Use this for everything related to kafka-configs except User SCRAM 
Credentials
-        if self.all_nodes_configs_command_uses_bootstrap_server():
-            return "--bootstrap-server %s --command-config <(echo '%s')" % 
(self.bootstrap_servers(self.security_protocol),
-                                                                            
self.security_config.client_config())
+    def kafka_acls_cmd_with_optional_security_settings(self, node, 
force_use_zk_connection, kafka_security_protocol = None, 
override_command_config = None):
+        force_use_zk_connection = force_use_zk_connection or not 
self.all_nodes_acl_command_supports_bootstrap_server
+        if force_use_zk_connection:
+            bootstrap_server_or_authorizer_zk_props = "--authorizer-properties 
zookeeper.connect=%s" % (self.zk_connect_setting())
+            skip_optional_security_settings = True
         else:
-            return "--zookeeper %s %s" % (self.zk_connect_setting(), 
self.zk.zkTlsConfigFileOption())
-
-    def _connect_setting_kafka_configs_scram(self, node):
-        # Use this for kafka-configs when operating on User SCRAM Credentials
-        if self.all_nodes_configs_command_uses_bootstrap_server_scram():
-            return "--bootstrap-server %s --command-config <(echo '%s')" %\
-                   (self.bootstrap_servers(self.security_protocol),
-                    
self.security_config.client_config(use_inter_broker_mechanism_for_client = 
True))
+            if kafka_security_protocol is None:
+                # it wasn't specified, so use the inter-broker security 
protocol if it is PLAINTEXT,
+                # otherwise use the client security protocol
+                if self.interbroker_security_protocol == 
SecurityConfig.PLAINTEXT:
+                    security_protocol_to_use = SecurityConfig.PLAINTEXT
+                else:
+                    security_protocol_to_use = self.security_protocol
+            else:
+                security_protocol_to_use = kafka_security_protocol
+            bootstrap_server_or_authorizer_zk_props = "--bootstrap-server %s" 
% (self.bootstrap_servers(security_protocol_to_use))
+            skip_optional_security_settings = security_protocol_to_use == 
SecurityConfig.PLAINTEXT
+        if skip_optional_security_settings:
+            optional_jass_krb_system_props_prefix = ""
+            optional_command_config_suffix = ""
         else:
-            return "--zookeeper %s %s" % (self.zk_connect_setting(), 
self.zk.zkTlsConfigFileOption())
-
-    def kafka_acls_cmd(self, node, force_use_zk_connection):
-        """
-        Returns kafka-acls.sh command path with jaas configuration and krb5 
environment variable
-        set. If Admin client is not going to be used, don't set the 
environment variable.
-        """
+            # we need security configs because aren't going to ZooKeeper and 
we aren't using PLAINTEXT
+            if (security_protocol_to_use == 
self.interbroker_security_protocol):
+                # configure JAAS to provide the broker's credentials
+                # since this is an authenticating cluster and we are going to 
use the inter-broker security protocol
+                jaas_conf_prop = 
KafkaService.ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = True
+            else:
+                # configure JAAS to provide the typical client credentials
+                jaas_conf_prop = KafkaService.JAAS_CONF_PROPERTY
+                use_inter_broker_mechanism_for_client = False
+            optional_jass_krb_system_props_prefix = "KAFKA_OPTS='-D%s -D%s' " 
% (jaas_conf_prop, KafkaService.KRB5_CONF)
+            if override_command_config is None:
+                optional_command_config_suffix = " --command-config <(echo 
'%s')" % 
(self.security_config.client_config(use_inter_broker_mechanism_for_client = 
use_inter_broker_mechanism_for_client))
+            else:
+                optional_command_config_suffix = " --command-config %s" % 
(override_command_config)
         kafka_acls_script = self.path.script("kafka-acls.sh", node)
-        skip_security_settings = force_use_zk_connection or not 
self.all_nodes_acl_command_supports_bootstrap_server()
-        return kafka_acls_script if skip_security_settings else \
-            "KAFKA_OPTS='-D%s -D%s' %s" % (KafkaService.JAAS_CONF_PROPERTY, 
KafkaService.KRB5_CONF, kafka_acls_script)
+        return "%s%s %s%s" % \
+               (optional_jass_krb_system_props_prefix, kafka_acls_script,
+                bootstrap_server_or_authorizer_zk_props, 
optional_command_config_suffix)
 
     def run_cli_tool(self, node, cmd):
         output = ""
@@ -977,18 +1077,6 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
     def zk_connect_setting(self):
         return self.zk.connect_setting(self.zk_chroot, self.zk_client_secure)
 
-    def _topic_command_connect_setting(self, node, force_use_zk_connection):
-        """
-        Checks if --bootstrap-server config is supported, if yes then returns 
a string with
-        bootstrap server, otherwise returns zookeeper connection string.
-        """
-        if not force_use_zk_connection and 
self.all_nodes_topic_command_supports_bootstrap_server():
-            connection_setting = "--bootstrap-server %s" % 
(self.bootstrap_servers(self.security_protocol))
-        else:
-            connection_setting = "--zookeeper %s" % (self.zk_connect_setting())
-
-        return connection_setting
-
     def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
         if validate and not port.open:
             raise ValueError("We are retrieving bootstrap servers for the 
port: %s which is not currently open. - " %
diff --git a/tests/kafkatest/services/security/kafka_acls.py 
b/tests/kafkatest/services/security/kafka_acls.py
index 3bb3e6f..96438bc 100644
--- a/tests/kafkatest/services/security/kafka_acls.py
+++ b/tests/kafkatest/services/security/kafka_acls.py
@@ -13,18 +13,27 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
-from kafkatest.services.kafka.util import fix_opts_for_new_jvm
 
-
-class ACLs(KafkaPathResolverMixin):
+class ACLs:
     def __init__(self, context):
         self.context = context
 
-    def set_acls(self, protocol, kafka, topic, group, 
force_use_zk_connection=False):
+    def set_acls(self, protocol, kafka, topic, group, 
force_use_zk_connection=False, additional_cluster_operations_to_grant = []):
+        """
+        Creates ACls for the Kafka Broker principal that brokers use in tests
+
+        :param protocol: the security protocol to use (e.g. PLAINTEXT, 
SASL_PLAINTEXT, etc.)
+        :param kafka: Kafka cluster upon which ClusterAction ACL is created
+        :param topic: topic for which produce and consume ACLs are created
+        :param group: consumer group for which consume ACL is created
+        :param force_use_zk_connection: forces the use of ZooKeeper when true, 
otherwise AdminClient is used when available.
+               This is necessary for the case where we are bootstrapping ACLs 
before Kafka is started or before authorizer is enabled
+        :param additional_cluster_operations_to_grant may be set to ['Alter', 
'Create'] if the cluster is secured since these are required
+               to create SCRAM credentials and topics, respectively
+        """
         # Set server ACLs
         kafka_principal = "User:CN=systemtest" if protocol == "SSL" else 
"User:kafka"
-        self.add_cluster_acl(kafka, kafka_principal, 
force_use_zk_connection=force_use_zk_connection)
+        self.add_cluster_acl(kafka, kafka_principal, 
force_use_zk_connection=force_use_zk_connection, 
additional_cluster_operations_to_grant = additional_cluster_operations_to_grant)
         self.add_read_acl(kafka, kafka_principal, "*", 
force_use_zk_connection=force_use_zk_connection)
 
         # Set client ACLs
@@ -32,39 +41,6 @@ class ACLs(KafkaPathResolverMixin):
         self.add_produce_acl(kafka, client_principal, topic, 
force_use_zk_connection=force_use_zk_connection)
         self.add_consume_acl(kafka, client_principal, topic, group, 
force_use_zk_connection=force_use_zk_connection)
 
-    def _acl_command_connect_setting(self, kafka, node, 
force_use_zk_connection):
-        """
-        Checks if --bootstrap-server config is supported, if yes then returns 
a string with
-        bootstrap server, otherwise returns authorizer properties for 
zookeeper connection.
-        """
-        if not force_use_zk_connection and 
kafka.all_nodes_acl_command_supports_bootstrap_server():
-            connection_setting = "--bootstrap-server %s" % 
(kafka.bootstrap_servers(kafka.security_protocol))
-        else:
-            connection_setting = "--authorizer-properties 
zookeeper.connect=%s" % (kafka.zk_connect_setting())
-
-        return connection_setting
-
-    def _kafka_acls_cmd_config(self, kafka, node, force_use_zk_connection):
-        """
-        Return --command-config parameter to the kafka-acls.sh command. The 
config parameter specifies
-        the security settings that AdminClient uses to connect to a secure 
kafka server.
-        """
-        skip_command_config = force_use_zk_connection or not 
kafka.all_nodes_acl_command_supports_bootstrap_server()
-        return "" if skip_command_config else " --command-config <(echo '%s')" 
% (kafka.security_config.client_config())
-
-    def _acl_cmd_prefix(self, kafka, node, force_use_zk_connection):
-        """
-        :param node: Node to use when determining connection settings
-        :param force_use_zk_connection: forces the use of ZooKeeper when true, 
otherwise AdminClient is used when available
-        :return command prefix for running kafka-acls
-        """
-        cmd = fix_opts_for_new_jvm(node)
-        cmd += "%s %s %s" % (
-            kafka.kafka_acls_cmd(node, force_use_zk_connection),
-            self._acl_command_connect_setting(kafka, node, 
force_use_zk_connection),
-            self._kafka_acls_cmd_config(kafka, node, force_use_zk_connection))
-        return cmd
-
     def _add_acl_on_topic(self, kafka, principal, topic, operation_flag, node, 
force_use_zk_connection):
         """
         :param principal: principal for which ACL is created
@@ -74,30 +50,32 @@ class ACLs(KafkaPathResolverMixin):
         :param force_use_zk_connection: forces the use of ZooKeeper when true, 
otherwise AdminClient is used when available
         """
         cmd = "%(cmd_prefix)s --add --topic=%(topic)s %(operation_flag)s 
--allow-principal=%(principal)s" % {
-            'cmd_prefix': self._acl_cmd_prefix(kafka, node, 
force_use_zk_connection),
+            'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(kafka, node, 
force_use_zk_connection),
             'topic': topic,
             'operation_flag': operation_flag,
             'principal': principal
         }
         kafka.run_cli_tool(node, cmd)
 
-    def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False):
+    def add_cluster_acl(self, kafka, principal, force_use_zk_connection=False, 
additional_cluster_operations_to_grant = []):
         """
         :param kafka: Kafka cluster upon which ClusterAction ACL is created
         :param principal: principal for which ClusterAction ACL is created
         :param node: Node to use when determining connection settings
         :param force_use_zk_connection: forces the use of ZooKeeper when true, 
otherwise AdminClient is used when available.
                This is necessary for the case where we are bootstrapping ACLs 
before Kafka is started or before authorizer is enabled
+        :param additional_cluster_operations_to_grant may be set to ['Alter', 
'Create'] if the cluster is secured since these are required
+               to create SCRAM credentials and topics, respectively
         """
         node = kafka.nodes[0]
 
-        force_use_zk_connection = force_use_zk_connection or not 
kafka.all_nodes_acl_command_supports_bootstrap_server()
-
-        cmd = "%(cmd_prefix)s --add --cluster --operation=ClusterAction 
--allow-principal=%(principal)s" % {
-            'cmd_prefix': self._acl_cmd_prefix(kafka, node, 
force_use_zk_connection),
-            'principal': principal
-        }
-        kafka.run_cli_tool(node, cmd)
+        for operation in ['ClusterAction'] + 
additional_cluster_operations_to_grant:
+            cmd = "%(cmd_prefix)s --add --cluster --operation=%(operation)s 
--allow-principal=%(principal)s" % {
+                'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(kafka, node, 
force_use_zk_connection),
+                'operation': operation,
+                'principal': principal
+            }
+            kafka.run_cli_tool(node, cmd)
 
     def add_read_acl(self, kafka, principal, topic, 
force_use_zk_connection=False):
         """
@@ -110,8 +88,6 @@ class ACLs(KafkaPathResolverMixin):
         """
         node = kafka.nodes[0]
 
-        force_use_zk_connection = force_use_zk_connection or not 
kafka.all_nodes_acl_command_supports_bootstrap_server()
-
         self._add_acl_on_topic(kafka, principal, topic, "--operation=Read", 
node, force_use_zk_connection)
 
     def add_produce_acl(self, kafka, principal, topic, 
force_use_zk_connection=False):
@@ -125,8 +101,6 @@ class ACLs(KafkaPathResolverMixin):
         """
         node = kafka.nodes[0]
 
-        force_use_zk_connection = force_use_zk_connection or not 
kafka.all_nodes_acl_command_supports_bootstrap_server()
-
         self._add_acl_on_topic(kafka, principal, topic, "--producer", node, 
force_use_zk_connection)
 
     def add_consume_acl(self, kafka, principal, topic, group, 
force_use_zk_connection=False):
@@ -141,10 +115,8 @@ class ACLs(KafkaPathResolverMixin):
         """
         node = kafka.nodes[0]
 
-        force_use_zk_connection = force_use_zk_connection or not 
kafka.all_nodes_acl_command_supports_bootstrap_server()
-
         cmd = "%(cmd_prefix)s --add --topic=%(topic)s --group=%(group)s 
--consumer --allow-principal=%(principal)s" % {
-            'cmd_prefix': self._acl_cmd_prefix(kafka, node, 
force_use_zk_connection),
+            'cmd_prefix': 
kafka.kafka_acls_cmd_with_optional_security_settings(kafka, node, 
force_use_zk_connection),
             'topic': topic,
             'group': group,
             'principal': principal
diff --git a/tests/kafkatest/services/security/security_config.py 
b/tests/kafkatest/services/security/security_config.py
index 437084b..f68b93d 100644
--- a/tests/kafkatest/services/security/security_config.py
+++ b/tests/kafkatest/services/security/security_config.py
@@ -259,8 +259,16 @@ class SecurityConfig(TemplateRenderer):
         if self.static_jaas_conf:
             node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
             
node.account.create_file(SecurityConfig.ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PATH,
-                                     
self.render_jaas_config("admin_client_as_broker_jaas.conf",
-                                                             
{'SecurityConfig': SecurityConfig}))
+                                     self.render_jaas_config(
+                                         "admin_client_as_broker_jaas.conf",
+                                         {
+                                             'node': node,
+                                             'is_ibm_jdk': any('IBM' in line 
for line in java_version),
+                                             'SecurityConfig': SecurityConfig,
+                                             'client_sasl_mechanism': 
self.client_sasl_mechanism,
+                                             'enabled_sasl_mechanisms': 
self.enabled_sasl_mechanisms
+                                         }
+                                     ))
 
         elif 'sasl.jaas.config' not in self.properties:
             self.properties['sasl.jaas.config'] = jaas_conf.replace("\n", " 
\\\n")
@@ -290,23 +298,6 @@ class SecurityConfig(TemplateRenderer):
         if java_version(node) <= 11 and self.properties.get('tls.version') == 
'TLSv1.3':
             self.properties.update({'tls.version': 'TLSv1.2'})
 
-    def maybe_setup_broker_scram_credentials(self, node, path, connect):
-        self.maybe_create_scram_credentials(node, connect, path, 
self.interbroker_sasl_mechanism,
-                                            SecurityConfig.SCRAM_BROKER_USER, 
SecurityConfig.SCRAM_BROKER_PASSWORD)
-
-    def maybe_setup_client_scram_credentials(self, node, path, connect):
-        self.maybe_create_scram_credentials(node, connect, path, 
self.client_sasl_mechanism,
-                                            SecurityConfig.SCRAM_CLIENT_USER, 
SecurityConfig.SCRAM_CLIENT_PASSWORD,
-                                            
self.export_kafka_opts_for_admin_client_as_broker())
-
-    def maybe_create_scram_credentials(self, node, connect, path, mechanism, 
user_name, password, kafka_opts_for_admin_client_as_broker = ""):
-        # we only need to create these credentials when the client and broker 
mechanisms are both SASL/SCRAM
-        if self.has_sasl and self.is_sasl_scram(mechanism) and 
self.is_sasl_scram(self.interbroker_sasl_mechanism):
-            cmd = "%s %s %s --entity-name %s --entity-type users --alter 
--add-config %s=[password=%s]" % \
-                  (kafka_opts_for_admin_client_as_broker, 
path.script("kafka-configs.sh", node), connect,
-                  user_name, mechanism, password)
-            node.account.ssh(cmd)
-
     def clean_node(self, node):
         if self.security_protocol != SecurityConfig.PLAINTEXT:
             node.account.ssh("rm -rf %s" % SecurityConfig.CONFIG_DIR, 
allow_fail=False)
@@ -366,14 +357,6 @@ class SecurityConfig(TemplateRenderer):
         else:
             return ""
 
-    def export_kafka_opts_for_admin_client_as_broker(self):
-        if self.has_sasl and self.static_jaas_conf:
-            kafka_opts_to_use = "\"-Djava.security.auth.login.config=%s 
-Djava.security.krb5.conf=%s\""\
-                                % 
(SecurityConfig.ADMIN_CLIENT_AS_BROKER_JAAS_CONF_PATH, 
SecurityConfig.KRB5CONF_PATH)
-        else:
-            kafka_opts_to_use = self.kafka_opts
-        return "export KAFKA_OPTS=%s;" % kafka_opts_to_use
-
     def props(self, prefix=''):
         """
         Return properties as string with line separators, optionally with a 
prefix.
diff --git 
a/tests/kafkatest/services/security/templates/admin_client_as_broker_jaas.conf 
b/tests/kafkatest/services/security/templates/admin_client_as_broker_jaas.conf
index b21d5da..b53da11 100644
--- 
a/tests/kafkatest/services/security/templates/admin_client_as_broker_jaas.conf
+++ 
b/tests/kafkatest/services/security/templates/admin_client_as_broker_jaas.conf
@@ -13,7 +13,31 @@
 
 
 KafkaClient {
+{% if "GSSAPI" in enabled_sasl_mechanisms %}
+{% if is_ibm_jdk %}
+    com.ibm.security.auth.module.Krb5LoginModule required debug=false
+    credsType=both
+    useKeytab="file:/mnt/security/keytab"
+    principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
+{% else %}
+    com.sun.security.auth.module.Krb5LoginModule required debug=false
+    doNotPrompt=true
+    useKeyTab=true
+    storeKey=true
+    keyTab="/mnt/security/keytab"
+    principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
+{% endif %}
+{% endif %}
+{% if "PLAIN" in enabled_sasl_mechanisms %}
+       org.apache.kafka.common.security.plain.PlainLoginModule required
+       username="kafka"
+       password="kafka-secret"
+       user_client="client-secret"
+       user_kafka="kafka-secret";
+{% endif %}
+{% if "SCRAM-SHA-256" in client_sasl_mechanism or "SCRAM-SHA-512" in 
client_sasl_mechanism %}
        org.apache.kafka.common.security.scram.ScramLoginModule required
        username="{{ SecurityConfig.SCRAM_BROKER_USER }}"
        password="{{ SecurityConfig.SCRAM_BROKER_PASSWORD }}";
+{% endif %}
 };
diff --git a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py 
b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
index 15eac3b..241c381 100644
--- a/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
+++ b/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py
@@ -97,7 +97,8 @@ class 
ZooKeeperSecurityUpgradeTest(ProduceConsumeValidateTest):
         if self.is_secure:
             self.kafka.authorizer_class_name = KafkaService.ACL_AUTHORIZER
             # Force use of direct ZooKeeper access because Kafka is not yet 
started
-            self.acls.set_acls(security_protocol, self.kafka, self.topic, 
self.group, force_use_zk_connection=True)
+            self.acls.set_acls(security_protocol, self.kafka, self.topic, 
self.group, force_use_zk_connection=True,
+                               
additional_cluster_operations_to_grant=['Create'])
 
         if self.no_sasl:
             self.kafka.start()

Reply via email to