This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 2.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 6de76f6fdf725bab7d6aa52893ba840e6518a7a4 Author: Stanislav Vodetskyi <[email protected]> AuthorDate: Fri Jun 21 09:51:43 2019 -0700 KAFKA-8557: system tests - add support for (optional) interbroker listener with the same security protocol as client listeners (#6938) Reviewers: Brian Bushree <[email protected]>, Rajini Sivaram <[email protected]> --- tests/kafkatest/services/kafka/kafka.py | 137 ++++++++++++++++----- .../services/kafka/templates/kafka.properties | 3 +- .../tests/core/security_rolling_upgrade_test.py | 96 +++++++++++---- 3 files changed, 181 insertions(+), 55 deletions(-) diff --git a/tests/kafkatest/services/kafka/kafka.py b/tests/kafkatest/services/kafka/kafka.py index fe972b9..e6e0256 100644 --- a/tests/kafkatest/services/kafka/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -32,7 +32,24 @@ from kafkatest.services.security.minikdc import MiniKdc from kafkatest.services.security.security_config import SecurityConfig from kafkatest.version import DEV_BRANCH, LATEST_0_10_0 -Port = collections.namedtuple('Port', ['name', 'number', 'open']) + +class KafkaListener: + + def __init__(self, name, port_number, security_protocol, open=False): + self.name = name + self.port_number = port_number + self.security_protocol = security_protocol + self.open = open + + def listener(self): + return "%s://:%s" % (self.name, str(self.port_number)) + + def advertised_listener(self, node): + return "%s://%s:%s" % (self.name, node.account.hostname, str(self.port_number)) + + def listener_security_protocol(self): + return "%s:%s" % (self.name, self.security_protocol) + class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): PERSISTENT_ROOT = "/mnt/kafka" @@ -50,6 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): # Kafka Authorizer SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer" HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin") + INTERBROKER_LISTENER_NAME = 'INTERNAL' logs = { "kafka_server_start_stdout_stderr": { @@ -76,11 +94,31 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None, topics=None, version=DEV_BRANCH, jmx_object_names=None, jmx_attributes=None, zk_connect_timeout=5000, zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None, - per_node_server_prop_overrides=None, extra_kafka_opts=""): + use_separate_interbroker_listener=False, per_node_server_prop_overrides=None, extra_kafka_opts=""): """ - :type context - :type zk: ZookeeperService - :type topics: dict + :param context: test context + :param ZookeeperService zk: + :param dict topics: which topics to create automatically + :param str security_protocol: security protocol for clients to use + :param str interbroker_security_protocol: security protocol to use for broker-to-broker communication + :param str client_sasl_mechanism: sasl mechanism for clients to use + :param str interbroker_sasl_mechanism: sasl mechanism to use for broker-to-broker communication + :param str authorizer_class_name: which authorizer class to use + :param str version: which kafka version to use. Defaults to "dev" branch + :param jmx_object_names: + :param jmx_attributes: + :param int zk_connect_timeout: + :param int zk_session_timeout: + :param dict server_prop_overides: overrides for kafka.properties file + :param zk_chroot: + :param bool use_separate_interbroker_listener - if set, will use a separate interbroker listener, + with security protocol set to interbroker_security_protocol value. If set, requires + interbroker_security_protocol to be provided. + Normally port name is the same as its security protocol, so setting security_protocol and + interbroker_security_protocol to the same value will lead to a single port being open and both client + and broker-to-broker communication will go over that port. This parameter allows + you to add an interbroker listener with the same security protocol as a client listener, but running on a + separate port. """ Service.__init__(self, context, num_nodes) JmxMixin.__init__(self, num_nodes=num_nodes, jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []), @@ -89,9 +127,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.zk = zk self.security_protocol = security_protocol - self.interbroker_security_protocol = interbroker_security_protocol self.client_sasl_mechanism = client_sasl_mechanism - self.interbroker_sasl_mechanism = interbroker_sasl_mechanism self.topics = topics self.minikdc = None self.authorizer_class_name = authorizer_class_name @@ -127,37 +163,64 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): self.zk_session_timeout = zk_session_timeout self.port_mappings = { - 'PLAINTEXT': Port('PLAINTEXT', 9092, False), - 'SSL': Port('SSL', 9093, False), - 'SASL_PLAINTEXT': Port('SASL_PLAINTEXT', 9094, False), - 'SASL_SSL': Port('SASL_SSL', 9095, False) + 'PLAINTEXT': KafkaListener('PLAINTEXT', 9092, 'PLAINTEXT', False), + 'SSL': KafkaListener('SSL', 9093, 'SSL', False), + 'SASL_PLAINTEXT': KafkaListener('SASL_PLAINTEXT', 9094, 'SASL_PLAINTEXT', False), + 'SASL_SSL': KafkaListener('SASL_SSL', 9095, 'SASL_SSL', False), + KafkaService.INTERBROKER_LISTENER_NAME: + KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, 9099, None, False) } + self.interbroker_listener = None + self.setup_interbroker_listener(interbroker_security_protocol, use_separate_interbroker_listener) + self.interbroker_sasl_mechanism = interbroker_sasl_mechanism + for node in self.nodes: node.version = version node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)}) - def set_version(self, version): for node in self.nodes: node.version = version @property + def interbroker_security_protocol(self): + return self.interbroker_listener.security_protocol + + # this is required for backwards compatibility - there are a lot of tests that set this property explicitly + # meaning 'use one of the existing listeners that match given security protocol, do not use custom listener' + @interbroker_security_protocol.setter + def interbroker_security_protocol(self, security_protocol): + self.setup_interbroker_listener(security_protocol, use_separate_listener=False) + + def setup_interbroker_listener(self, security_protocol, use_separate_listener=False): + self.use_separate_interbroker_listener = use_separate_listener + + if self.use_separate_interbroker_listener: + # do not close existing port here since it is not used exclusively for interbroker communication + self.interbroker_listener = self.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME] + self.interbroker_listener.security_protocol = security_protocol + else: + # close dedicated interbroker port, so it's not dangling in 'listeners' and 'advertised.listeners' + self.close_port(KafkaService.INTERBROKER_LISTENER_NAME) + self.interbroker_listener = self.port_mappings[security_protocol] + + @property def security_config(self): - config = SecurityConfig(self.context, self.security_protocol, self.interbroker_security_protocol, - zk_sasl=self.zk.zk_sasl, - client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism) - for protocol in self.port_mappings: - port = self.port_mappings[protocol] + config = SecurityConfig(self.context, self.security_protocol, self.interbroker_listener.security_protocol, + zk_sasl=self.zk.zk_sasl, + client_sasl_mechanism=self.client_sasl_mechanism, + interbroker_sasl_mechanism=self.interbroker_sasl_mechanism) + for port in self.port_mappings.values(): if port.open: - config.enable_security_protocol(port.name) + config.enable_security_protocol(port.security_protocol) return config - def open_port(self, protocol): - self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True) + def open_port(self, listener_name): + self.port_mappings[listener_name].open = True - def close_port(self, protocol): - self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=False) + def close_port(self, listener_name): + self.port_mappings[listener_name].open = False def start_minikdc(self, add_principals=""): if self.security_config.has_sasl: @@ -172,7 +235,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def start(self, add_principals=""): self.open_port(self.security_protocol) - self.open_port(self.interbroker_security_protocol) + self.interbroker_listener.open = True self.start_minikdc(add_principals) self._ensure_zk_chroot() @@ -210,15 +273,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def set_protocol_and_port(self, node): listeners = [] advertised_listeners = [] + protocol_map = [] - for protocol in self.port_mappings: - port = self.port_mappings[protocol] + for port in self.port_mappings.values(): if port.open: - listeners.append(port.name + "://:" + str(port.number)) - advertised_listeners.append(port.name + "://" + node.account.hostname + ":" + str(port.number)) + listeners.append(port.listener()) + advertised_listeners.append(port.advertised_listener(node)) + protocol_map.append(port.listener_security_protocol()) self.listeners = ','.join(listeners) self.advertised_listeners = ','.join(advertised_listeners) + self.listener_security_protocol_map = ','.join(protocol_map) + self.interbroker_bootstrap_servers = self.__bootstrap_servers(self.interbroker_listener, True) def prop_file(self, node): self.set_protocol_and_port(node) @@ -685,18 +751,23 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service): def zk_connect_setting(self): return self.zk.connect_setting(self.zk_chroot) + def __bootstrap_servers(self, port, validate=True, offline_nodes=[]): + if validate and not port.open: + raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % + str(port.port_number)) + + return ','.join([node.account.hostname + ":" + str(port.port_number) + for node in self.nodes + if node not in offline_nodes]) + def bootstrap_servers(self, protocol='PLAINTEXT', validate=True, offline_nodes=[]): """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,... This is the format expected by many config files. """ port_mapping = self.port_mappings[protocol] - self.logger.info("Bootstrap client port is: " + str(port_mapping.number)) - - if validate and not port_mapping.open: - raise ValueError("We are retrieving bootstrap servers for the port: %s which is not currently open. - " % str(port_mapping)) - - return ','.join([node.account.hostname + ":" + str(port_mapping.number) for node in self.nodes if node not in offline_nodes]) + self.logger.info("Bootstrap client port is: " + str(port_mapping.port_number)) + return self.__bootstrap_servers(port_mapping, validate, offline_nodes) def controller(self): """ Get the controller node diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties index 4362978..2736e91 100644 --- a/tests/kafkatest/services/kafka/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -19,8 +19,9 @@ advertised.host.name={{ node.account.hostname }} listeners={{ listeners }} advertised.listeners={{ advertised_listeners }} +listener.security.protocol.map={{ listener_security_protocol_map }} -security.inter.broker.protocol={{ security_config.interbroker_security_protocol }} +inter.broker.listener.name={{ interbroker_listener.name }} ssl.keystore.location=/mnt/security/test.keystore.jks ssl.keystore.password=test-ks-passwd diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py index ba014ea..a64363c 100644 --- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py +++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py @@ -12,15 +12,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - +from kafkatest.services.security.security_config import SecurityConfig from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.utils import is_int from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest -from ducktape.mark import parametrize, matrix +from ducktape.mark import matrix from ducktape.mark.resource import cluster from kafkatest.services.security.kafka_acls import ACLs import time @@ -66,13 +65,12 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): time.sleep(10) def roll_in_secured_settings(self, client_protocol, broker_protocol): - # Roll cluster to include inter broker security protocol. - self.kafka.interbroker_security_protocol = broker_protocol + self.kafka.setup_interbroker_listener(broker_protocol) self.bounce() # Roll cluster to disable PLAINTEXT port - self.kafka.close_port('PLAINTEXT') + self.kafka.close_port(SecurityConfig.PLAINTEXT) self.set_authorizer_and_bounce(client_protocol, broker_protocol) def set_authorizer_and_bounce(self, client_protocol, broker_protocol): @@ -100,17 +98,31 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): # Bounce again with ACLs for new mechanism self.set_authorizer_and_bounce(security_protocol, security_protocol) + def add_separate_broker_listener(self, broker_security_protocol, broker_sasl_mechanism): + self.kafka.setup_interbroker_listener(broker_security_protocol, True) + self.kafka.interbroker_sasl_mechanism = broker_sasl_mechanism + # kafka opens interbroker port automatically in start() but not in bounce() + self.kafka.open_port(self.kafka.INTERBROKER_LISTENER_NAME) + self.bounce() + + def remove_separate_broker_listener(self, client_security_protocol, client_sasl_mechanism): + # separate interbroker listener port will be closed automatically in setup_interbroker_listener + # if not using separate interbroker listener + self.kafka.setup_interbroker_listener(client_security_protocol, False) + self.kafka.interbroker_sasl_mechanism = client_sasl_mechanism + self.bounce() + @cluster(num_nodes=8) - @matrix(client_protocol=["SSL"]) + @matrix(client_protocol=[SecurityConfig.SSL]) @cluster(num_nodes=9) - @matrix(client_protocol=["SASL_PLAINTEXT", "SASL_SSL"]) + @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT, SecurityConfig.SASL_SSL]) def test_rolling_upgrade_phase_one(self, client_protocol): """ Start with a PLAINTEXT cluster, open a SECURED port, via a rolling upgrade, ensuring we could produce and consume throughout over PLAINTEXT. Finally check we can produce and consume the new secured port. """ - self.kafka.interbroker_security_protocol = "PLAINTEXT" - self.kafka.security_protocol = "PLAINTEXT" + self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT) + self.kafka.security_protocol = SecurityConfig.PLAINTEXT self.kafka.start() # Create PLAINTEXT producer and consumer @@ -125,7 +137,8 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): self.run_produce_consume_validate(lambda: time.sleep(1)) @cluster(num_nodes=8) - @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"]) + @matrix(client_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT], + broker_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, SecurityConfig.SASL_PLAINTEXT]) def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol): """ Start with a PLAINTEXT cluster with a second Secured port open (i.e. result of phase one). @@ -137,7 +150,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): """ #Given we have a broker that has both secure and PLAINTEXT ports open self.kafka.security_protocol = client_protocol - self.kafka.interbroker_security_protocol = "PLAINTEXT" + self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, use_separate_listener=False) self.kafka.open_port(broker_protocol) self.kafka.start() @@ -148,16 +161,16 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): self.run_produce_consume_validate(self.roll_in_secured_settings, client_protocol, broker_protocol) @cluster(num_nodes=9) - @parametrize(new_client_sasl_mechanism='PLAIN') + @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN]) def test_rolling_upgrade_sasl_mechanism_phase_one(self, new_client_sasl_mechanism): """ Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a rolling upgrade, ensuring we could produce and consume throughout over SASL/GSSAPI. Finally check we can produce and consume using new mechanism. """ - self.kafka.interbroker_security_protocol = "SASL_SSL" - self.kafka.security_protocol = "SASL_SSL" - self.kafka.client_sasl_mechanism = "GSSAPI" - self.kafka.interbroker_sasl_mechanism = "GSSAPI" + self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=False) + self.kafka.security_protocol = SecurityConfig.SASL_SSL + self.kafka.client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI + self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI self.kafka.start() # Create SASL/GSSAPI producer and consumer @@ -172,7 +185,7 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): self.run_produce_consume_validate(lambda: time.sleep(1)) @cluster(num_nodes=8) - @parametrize(new_sasl_mechanism='PLAIN') + @matrix(new_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN]) def test_rolling_upgrade_sasl_mechanism_phase_two(self, new_sasl_mechanism): """ Start with a SASL cluster with GSSAPI for inter-broker and a second mechanism for clients (i.e. result of phase one). @@ -182,10 +195,10 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): Ensure the producer and consumer run throughout """ #Start with a broker that has GSSAPI for inter-broker and a second mechanism for clients - self.kafka.security_protocol = "SASL_SSL" - self.kafka.interbroker_security_protocol = "SASL_SSL" + self.kafka.security_protocol = SecurityConfig.SASL_SSL + self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=False) self.kafka.client_sasl_mechanism = new_sasl_mechanism - self.kafka.interbroker_sasl_mechanism = "GSSAPI" + self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI self.kafka.start() #Create Producer and Consumer using second mechanism @@ -194,3 +207,44 @@ class TestSecurityRollingUpgrade(ProduceConsumeValidateTest): #Roll in the second SASL mechanism for inter-broker, disabling first mechanism. Ensure we can produce and consume throughout self.run_produce_consume_validate(self.roll_in_sasl_mechanism, self.kafka.security_protocol, new_sasl_mechanism) + @cluster(num_nodes=9) + def test_enable_separate_interbroker_listener(self): + """ + Start with a cluster that has a single PLAINTEXT listener. + Start producing/consuming on PLAINTEXT port. + While doing that, do a rolling restart to enable separate secured interbroker port + """ + self.kafka.security_protocol = SecurityConfig.PLAINTEXT + self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, use_separate_listener=False) + + self.kafka.start() + + self.create_producer_and_consumer() + + self.run_produce_consume_validate(self.add_separate_broker_listener, SecurityConfig.SASL_SSL, + SecurityConfig.SASL_MECHANISM_PLAIN) + + @cluster(num_nodes=9) + def test_disable_separate_interbroker_listener(self): + """ + Start with a cluster that has two listeners, one on SSL (clients), another on SASL_SSL (broker-to-broker). + Start producer and consumer on SSL listener. + Close dedicated interbroker listener via rolling restart. + Ensure we can produce and consume via SSL listener throughout. + """ + client_protocol = SecurityConfig.SSL + client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI + + self.kafka.security_protocol = client_protocol + self.kafka.client_sasl_mechanism = client_sasl_mechanism + self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, use_separate_listener=True) + self.kafka.interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI + + self.kafka.start() + # create producer and consumer via client security protocol + self.create_producer_and_consumer() + + # run produce/consume/validate loop while disabling a separate interbroker listener via rolling restart + self.run_produce_consume_validate( + self.remove_separate_broker_listener, client_protocol, client_sasl_mechanism) +
