This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 5bbe32b174131652e09d7288374d33b3fb35361a Author: Brian Bushree <[email protected]> AuthorDate: Thu Jun 27 10:10:43 2019 -0700 MINOR: Support listener config overrides in system tests (#6981) Reviewers: Rajini Sivaram <[email protected]> --- tests/kafkatest/services/kafka/kafka.py | 26 +++++++--------- .../services/kafka/templates/kafka.properties | 18 +++++++++++ .../services/security/listener_security_config.py | 36 ++++++++++++++++++++++ .../kafkatest/services/security/security_config.py | 35 +++++++++++++-------- 4 files changed, 87 insertions(+), 28 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index e6e0256..d39ff6a 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -29,6 +29,7 @@ from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin from kafkatest.services.kafka import config_property from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.security.minikdc import MiniKdc +from kafkatest.services.security.listener_security_config import ListenerSecurityConfig from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH, LATEST_0_10_0 @@ -50,7 +51,6 @@ class KafkaListener: def listener_security_protocol(self): return "%s:%s" % (self.name, self.security_protocol) - class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): PERSISTENT_ROOT = "/mnt/kafka" STDOUT_STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "server-start-stdout-stderr.log") @@ -94,7 +94,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None, jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None, - use_separate_interbroker_listener=False, per_node_server_prop_overrides=None, extra_kafka_opts=""): + listener_security_config=ListenerSecurityConfig(), per_node_server_prop_overrides=None, extra_kafka_opts=""): """ :param context: test context :param ZookeeperService zk: @@ -111,14 +111,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): :param int zk_session_timeout: :param dict server_prop_overides: overrides for kafka.properties file :param zk_chroot: - :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener, - with security protocol set to interbroker_security_protocol value. If set, requires - interbroker_security_protocol to be provided. - Normally port name is the same as its security protocol, so setting security_protocol and - interbroker_security_protocol to the same value will lead to a single port being open and both client - and broker-to-broker communication will go over that port. This parameter allows - you to add an interbroker listener with the same security protocol as a client listener, but running on a - separate port. + :param ListenerSecurityConfig listener_security_config: listener config to use """ Service.__init__(self, context, num_nodes) JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), @@ -143,6 +136,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.log_level = "DEBUG" self.zk_chroot = zk_chroot self.extra_kafka_opts = extra_kafka_opts + self.listener_security_config = listener_security_config # # In a heavily loaded and not very fast machine, it is @@ -172,7 +166,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): } self.interbroker_listener = None - self.setup_interbroker_listener(interbroker_security_protocol, use_separate_interbroker_listener) + self.setup_interbroker_listener(interbroker_security_protocol, self.listener_security_config.use_separate_interbroker_listener) self.interbroker_sasl_mechanism = interbroker_sasl_mechanism for node in self.nodes: @@ -194,9 +188,9 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.setup_interbroker_listener(security_protocol, use_separate_listener=False) def setup_interbroker_listener(self, security_protocol, use_separate_listener=False): - self.use_separate_interbroker_listener = use_separate_listener + self.listener_security_config.use_separate_interbroker_listener = use_separate_listener - if self.use_separate_interbroker_listener: + if self.listener_security_config.use_separate_interbroker_listener: # do not close existing port here since it is not used exclusively for interbroker communication self.interbroker_listener = self.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME] self.interbroker_listener.security_protocol = security_protocol @@ -210,7 +204,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol, zk_sasl=self.zk.zk_sasl, client_sasl_mechanism=self.client_sasl_mechanism, - interbroker_sasl_mechanism=self.interbroker_sasl_mechanism) + interbroker_sasl_mechanism=self.interbroker_sasl_mechanism, + listener_security_config=self.listener_security_config) for port in self.port_mappings.values(): if port.open: config.enable_security_protocol(port.security_protocol) @@ -291,7 +286,8 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): #load template configs as dictionary config_template = self.render('kafka.properties', node=node, broker_id=self.idx(node), - security_config=self.security_config, num_nodes=self.num_nodes) + security_config=self.security_config, num_nodes=self.num_nodes, + listener_security_config=self.listener_security_config) configs = dict( l.rstrip().split('=', 1) for l in config_template.split('\n') if not l.startswith("#") and "=" in l ) diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 11e43be..6060bfa 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -27,6 +27,24 @@ inter.broker.listener.name={{ interbroker_listener.name }} security.inter.broker.protocol={{ interbroker_listener.security_protocol }} {% endif %} +{% for k, v in listener_security_config.client_listener_overrides.iteritems() %} +{% if k.startswith('sasl.') %} +listener.name.{{ security_protocol.lower() }}.{{ security_config.client_sasl_mechanism.lower() }}.{{ k }}={{ v }} +{% else %} +listener.name.{{ security_protocol.lower() }}.{{ k }}={{ v }} +{% endif %} +{% endfor %} + +{% if interbroker_listener.name != security_protocol %} +{% for k, v in listener_security_config.interbroker_listener_overrides.iteritems() %} +{% if k.startswith('sasl.') %} +listener.name.{{ interbroker_listener.name.lower() }}.{{ security_config.interbroker_sasl_mechanism.lower() }}.{{ k }}={{ v }} +{% else %} +listener.name.{{ interbroker_listener.name.lower() }}.{{ k }}={{ v }} +{% endif %} +{% endfor %} +{% endif %} + ssl.keystore.location=/mnt/security/test.keystore.jks ssl.keystore.password=test-ks-passwd ssl.key.password=test-key-passwd diff --git a/tests/kafkatest/services/security/listener_security_config.py b/tests/kafkatest/services/security/listener_security_config.py new file mode 100644 index 0000000..74e9e39 --- /dev/null +++ b/tests/kafkatest/services/security/listener_security_config.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class ListenerSecurityConfig: + + def __init__(self, use_separate_interbroker_listener=False, + client_listener_overrides={}, interbroker_listener_overrides={}): + """ + :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener, + with security protocol set to interbroker_security_protocol value. If set, requires + interbroker_security_protocol to be provided. + Normally port name is the same as its security protocol, so setting security_protocol and + interbroker_security_protocol to the same value will lead to a single port being open and both client + and broker-to-broker communication will go over that port. This parameter allows + you to add an interbroker listener with the same security protocol as a client listener, but running on a + separate port. + :param dict client_listener_overrides - non-prefixed listener config overrides for named client listener + (for example 'sasl.jaas.config', 'ssl.keystore.location', 'sasl.login.callback.handler.class', etc). + :param dict interbroker_listener_overrides - non-prefixed listener config overrides for named interbroker + listener (for example 'sasl.jaas.config', 'ssl.keystore.location', 'sasl.login.callback.handler.class', etc). + """ + self.use_separate_interbroker_listener = use_separate_interbroker_listener + self.client_listener_overrides = client_listener_overrides + self.interbroker_listener_overrides = interbroker_listener_overrides \ No newline at end of file diff --git a/tests/kafkatest/services/security/security_config.py b/tests/kafkatest/services/security/security_config.py index b2fa489..0398557 100644 --- a/tests/kafkatest/services/security/security_config.py +++ b/tests/kafkatest/services/security/security_config.py @@ -19,6 +19,7 @@ from tempfile import mkdtemp from shutil import rmtree from ducktape.template import TemplateRenderer from kafkatest.services.security.minikdc import MiniKdc +from kafkatest.services.security.listener_security_config import ListenerSecurityConfig import itertools @@ -112,7 +113,8 @@ class SecurityConfig(TemplateRenderer): def __init__(self, context, security_protocol=None, interbroker_security_protocol=None, client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI, - zk_sasl=False, template_props="", static_jaas_conf=True, jaas_override_variables=None): + zk_sasl=False, template_props="", static_jaas_conf=True, jaas_override_variables=None, + listener_security_config=ListenerSecurityConfig()): """ Initialize the security properties for the node and copy keystore and truststore to the remote node if the transport protocol @@ -144,6 +146,7 @@ class SecurityConfig(TemplateRenderer): self.has_ssl = self.is_ssl(security_protocol) or self.is_ssl(interbroker_security_protocol) self.zk_sasl = zk_sasl self.static_jaas_conf = static_jaas_conf + self.listener_security_config = listener_security_config self.properties = { 'security.protocol' : security_protocol, 'ssl.keystore.location' : SecurityConfig.KEYSTORE_PATH, @@ -156,6 +159,7 @@ class SecurityConfig(TemplateRenderer): 'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism, 'sasl.kerberos.service.name' : 'kafka' } + self.properties.update(self.listener_security_config.client_listener_overrides) self.jaas_override_variables = jaas_override_variables or {} def client_config(self, template_props="", node=None, jaas_override_variables=None): @@ -169,7 +173,8 @@ class SecurityConfig(TemplateRenderer): client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props, static_jaas_conf=static_jaas_conf, - jaas_override_variables=jaas_override_variables) + jaas_override_variables=jaas_override_variables, + listener_security_config=self.listener_security_config) def enable_security_protocol(self, security_protocol): self.has_sasl = self.has_sasl or self.is_sasl(security_protocol) @@ -185,20 +190,24 @@ class SecurityConfig(TemplateRenderer): jaas_conf_file = "jaas.conf" java_version = node.account.ssh_capture("java -version") - jaas_conf = self.render_jaas_config( - jaas_conf_file, - { - 'node': node, - 'is_ibm_jdk': any('IBM' in line for line in java_version), - 'SecurityConfig': SecurityConfig, - 'client_sasl_mechanism': self.client_sasl_mechanism, - 'enabled_sasl_mechanisms': self.enabled_sasl_mechanisms - } - ) + jaas_conf = None + if 'sasl.jaas.config' not in self.properties: + jaas_conf = self.render_jaas_config( + jaas_conf_file, + { + 'node': node, + 'is_ibm_jdk': any('IBM' in line for line in java_version), + 'SecurityConfig': SecurityConfig, + 'client_sasl_mechanism': self.client_sasl_mechanism, + 'enabled_sasl_mechanisms': self.enabled_sasl_mechanisms + } + ) + else: + jaas_conf = self.properties['sasl.jaas.config'] if self.static_jaas_conf: node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf) - else: + elif 'sasl.jaas.config' not in self.properties: self.properties['sasl.jaas.config'] = jaas_conf.replace("\n", " \\\n") if self.has_sasl_kerberos: node.account.copy_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
