This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f51d7d3  KAFKA-8557: system tests - add support for (optional) 
interbroker listener with the same security protocol as client listeners (#6938)
f51d7d3 is described below

commit f51d7d3c931ad9de34cf9633b5b97d9a7b5a0ad4
Author: Stanislav Vodetskyi <[email protected]>
AuthorDate: Fri Jun 21 09:51:43 2019 -0700

    KAFKA-8557: system tests - add support for (optional) interbroker listener 
with the same security protocol as client listeners (#6938)
    
    Reviewers: Brian Bushree <[email protected]>, Rajini Sivaram 
<[email protected]>
---
 tests/kafkatest/services/kafka/kafka.py            | 138 ++++++++++++++++-----
 .../services/kafka/templates/kafka.properties      |   3 +-
 .../tests/core/security_rolling_upgrade_test.py    |  96 ++++++++++----
 3 files changed, 182 insertions(+), 55 deletions(-)

diff --git a/tests/kafkatest/services/kafka/kafka.py 
b/tests/kafkatest/services/kafka/kafka.py
index 6c0920e..d77c784 100644
--- a/tests/kafkatest/services/kafka/kafka.py
+++ b/tests/kafkatest/services/kafka/kafka.py
@@ -32,7 +32,24 @@ from kafkatest.services.security.minikdc import MiniKdc
 from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.version import DEV_BRANCH, LATEST_0_10_0
 
-Port = collections.namedtuple('Port', ['name', 'number', 'open'])
+
+class KafkaListener:
+
+    def __init__(self, name, port_number, security_protocol, open=False):
+        self.name = name
+        self.port_number = port_number
+        self.security_protocol = security_protocol
+        self.open = open
+
+    def listener(self):
+        return "%s://:%s" % (self.name, str(self.port_number))
+
+    def advertised_listener(self, node):
+        return "%s://%s:%s" % (self.name, node.account.hostname, 
str(self.port_number))
+
+    def listener_security_protocol(self):
+        return "%s:%s" % (self.name, self.security_protocol)
+
 
 class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     PERSISTENT_ROOT = "/mnt/kafka"
@@ -50,6 +67,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, Service):
     # Kafka Authorizer
     SIMPLE_AUTHORIZER = "kafka.security.auth.SimpleAclAuthorizer"
     HEAP_DUMP_FILE = os.path.join(PERSISTENT_ROOT, "kafka_heap_dump.bin")
+    INTERBROKER_LISTENER_NAME = 'INTERNAL'
 
     logs = {
         "kafka_server_start_stdout_stderr": {
@@ -75,11 +93,32 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
     def __init__(self, context, num_nodes, zk, 
security_protocol=SecurityConfig.PLAINTEXT, 
interbroker_security_protocol=SecurityConfig.PLAINTEXT,
                  client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, 
interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
                  authorizer_class_name=None, topics=None, version=DEV_BRANCH, 
jmx_object_names=None,
-                 jmx_attributes=None, zk_connect_timeout=5000, 
zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None):
+                 jmx_attributes=None, zk_connect_timeout=5000, 
zk_session_timeout=6000, server_prop_overides=None, zk_chroot=None,
+                 use_separate_interbroker_listener=False):
         """
-        :type context
-        :type zk: ZookeeperService
-        :type topics: dict
+        :param context: test context
+        :param ZookeeperService zk:
+        :param dict topics: which topics to create automatically
+        :param str security_protocol: security protocol for clients to use
+        :param str interbroker_security_protocol: security protocol to use for 
broker-to-broker communication
+        :param str client_sasl_mechanism: sasl mechanism for clients to use
+        :param str interbroker_sasl_mechanism: sasl mechanism to use for 
broker-to-broker communication
+        :param str authorizer_class_name: which authorizer class to use
+        :param str version: which kafka version to use. Defaults to "dev" 
branch
+        :param jmx_object_names:
+        :param jmx_attributes:
+        :param int zk_connect_timeout:
+        :param int zk_session_timeout:
+        :param dict server_prop_overides: overrides for kafka.properties file
+        :param zk_chroot:
+        :param bool use_separate_interbroker_listener - if set, will use a 
separate interbroker listener,
+        with security protocol set to interbroker_security_protocol value. If 
set, requires
+        interbroker_security_protocol to be provided.
+        Normally port name is the same as its security protocol, so setting 
security_protocol and
+        interbroker_security_protocol to the same value will lead to a single 
port being open and both client
+        and broker-to-broker communication will go over that port. This 
parameter allows
+        you to add an interbroker listener with the same security protocol as 
a client listener, but running on a
+        separate port.
         """
         Service.__init__(self, context, num_nodes)
         JmxMixin.__init__(self, num_nodes=num_nodes, 
jmx_object_names=jmx_object_names, jmx_attributes=(jmx_attributes or []),
@@ -88,9 +127,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.zk = zk
 
         self.security_protocol = security_protocol
-        self.interbroker_security_protocol = interbroker_security_protocol
         self.client_sasl_mechanism = client_sasl_mechanism
-        self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
         self.topics = topics
         self.minikdc = None
         self.authorizer_class_name = authorizer_class_name
@@ -121,37 +158,64 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
         self.zk_session_timeout = zk_session_timeout
 
         self.port_mappings = {
-            'PLAINTEXT': Port('PLAINTEXT', 9092, False),
-            'SSL': Port('SSL', 9093, False),
-            'SASL_PLAINTEXT': Port('SASL_PLAINTEXT', 9094, False),
-            'SASL_SSL': Port('SASL_SSL', 9095, False)
+            'PLAINTEXT': KafkaListener('PLAINTEXT', 9092, 'PLAINTEXT', False),
+            'SSL': KafkaListener('SSL', 9093, 'SSL', False),
+            'SASL_PLAINTEXT': KafkaListener('SASL_PLAINTEXT', 9094, 
'SASL_PLAINTEXT', False),
+            'SASL_SSL': KafkaListener('SASL_SSL', 9095, 'SASL_SSL', False),
+            KafkaService.INTERBROKER_LISTENER_NAME:
+                KafkaListener(KafkaService.INTERBROKER_LISTENER_NAME, 9099, 
None, False)
         }
 
+        self.interbroker_listener = None
+        self.setup_interbroker_listener(interbroker_security_protocol, 
use_separate_interbroker_listener)
+        self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
+
         for node in self.nodes:
             node.version = version
             node.config = KafkaConfig(**{config_property.BROKER_ID: 
self.idx(node)})
 
-
     def set_version(self, version):
         for node in self.nodes:
             node.version = version
 
     @property
+    def interbroker_security_protocol(self):
+        return self.interbroker_listener.security_protocol
+
+    # this is required for backwards compatibility - there are a lot of tests 
that set this property explicitly
+    # meaning 'use one of the existing listeners that match given security 
protocol, do not use custom listener'
+    @interbroker_security_protocol.setter
+    def interbroker_security_protocol(self, security_protocol):
+        self.setup_interbroker_listener(security_protocol, 
use_separate_listener=False)
+
+    def setup_interbroker_listener(self, security_protocol, 
use_separate_listener=False):
+        self.use_separate_interbroker_listener = use_separate_listener
+
+        if self.use_separate_interbroker_listener:
+            # do not close existing port here since it is not used exclusively 
for interbroker communication
+            self.interbroker_listener = 
self.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME]
+            self.interbroker_listener.security_protocol = security_protocol
+        else:
+            # close dedicated interbroker port, so it's not dangling in 
'listeners' and 'advertised.listeners'
+            self.close_port(KafkaService.INTERBROKER_LISTENER_NAME)
+            self.interbroker_listener = self.port_mappings[security_protocol]
+
+    @property
     def security_config(self):
-        config = SecurityConfig(self.context, self.security_protocol, 
self.interbroker_security_protocol,
-                              zk_sasl=self.zk.zk_sasl,
-                              
client_sasl_mechanism=self.client_sasl_mechanism, 
interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
-        for protocol in self.port_mappings:
-            port = self.port_mappings[protocol]
+        config = SecurityConfig(self.context, self.security_protocol, 
self.interbroker_listener.security_protocol,
+                                zk_sasl=self.zk.zk_sasl,
+                                
client_sasl_mechanism=self.client_sasl_mechanism,
+                                
interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
+        for port in self.port_mappings.values():
             if port.open:
-                config.enable_security_protocol(port.name)
+                config.enable_security_protocol(port.security_protocol)
         return config
 
-    def open_port(self, protocol):
-        self.port_mappings[protocol] = 
self.port_mappings[protocol]._replace(open=True)
+    def open_port(self, listener_name):
+        self.port_mappings[listener_name].open = True
 
-    def close_port(self, protocol):
-        self.port_mappings[protocol] = 
self.port_mappings[protocol]._replace(open=False)
+    def close_port(self, listener_name):
+        self.port_mappings[listener_name].open = False
 
     def start_minikdc(self, add_principals=""):
         if self.security_config.has_sasl:
@@ -166,7 +230,7 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
 
     def start(self, add_principals=""):
         self.open_port(self.security_protocol)
-        self.open_port(self.interbroker_security_protocol)
+        self.interbroker_listener.open = True
 
         self.start_minikdc(add_principals)
         self._ensure_zk_chroot()
@@ -204,15 +268,18 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
     def set_protocol_and_port(self, node):
         listeners = []
         advertised_listeners = []
+        protocol_map = []
 
-        for protocol in self.port_mappings:
-            port = self.port_mappings[protocol]
+        for port in self.port_mappings.values():
             if port.open:
-                listeners.append(port.name + "://:" + str(port.number))
-                advertised_listeners.append(port.name + "://" +  
node.account.hostname + ":" + str(port.number))
+                listeners.append(port.listener())
+                advertised_listeners.append(port.advertised_listener(node))
+                protocol_map.append(port.listener_security_protocol())
 
         self.listeners = ','.join(listeners)
         self.advertised_listeners = ','.join(advertised_listeners)
+        self.listener_security_protocol_map = ','.join(protocol_map)
+        self.interbroker_bootstrap_servers = 
self.__bootstrap_servers(self.interbroker_listener, True)
 
     def prop_file(self, node):
         self.set_protocol_and_port(node)
@@ -676,18 +743,23 @@ class KafkaService(KafkaPathResolverMixin, JmxMixin, 
Service):
     def zk_connect_setting(self):
         return self.zk.connect_setting(self.zk_chroot)
 
+    def __bootstrap_servers(self, port, validate=True, offline_nodes=[]):
+        if validate and not port.open:
+            raise ValueError("We are retrieving bootstrap servers for the 
port: %s which is not currently open. - " %
+                             str(port.port_number))
+
+        return ','.join([node.account.hostname + ":" + str(port.port_number)
+                         for node in self.nodes
+                         if node not in offline_nodes])
+
     def bootstrap_servers(self, protocol='PLAINTEXT', validate=True, 
offline_nodes=[]):
         """Return comma-delimited list of brokers in this cluster formatted as 
HOSTNAME1:PORT1,HOSTNAME:PORT2,...
 
         This is the format expected by many config files.
         """
         port_mapping = self.port_mappings[protocol]
-        self.logger.info("Bootstrap client port is: " + 
str(port_mapping.number))
-
-        if validate and not port_mapping.open:
-            raise ValueError("We are retrieving bootstrap servers for the 
port: %s which is not currently open. - " % str(port_mapping))
-
-        return ','.join([node.account.hostname + ":" + 
str(port_mapping.number) for node in self.nodes if node not in offline_nodes])
+        self.logger.info("Bootstrap client port is: " + 
str(port_mapping.port_number))
+        return self.__bootstrap_servers(port_mapping, validate, offline_nodes)
 
     def controller(self):
         """ Get the controller node
diff --git a/tests/kafkatest/services/kafka/templates/kafka.properties 
b/tests/kafkatest/services/kafka/templates/kafka.properties
index 4362978..2736e91 100644
--- a/tests/kafkatest/services/kafka/templates/kafka.properties
+++ b/tests/kafkatest/services/kafka/templates/kafka.properties
@@ -19,8 +19,9 @@ advertised.host.name={{ node.account.hostname }}
 
 listeners={{ listeners }}
 advertised.listeners={{ advertised_listeners }}
+listener.security.protocol.map={{ listener_security_protocol_map }}
 
-security.inter.broker.protocol={{ 
security_config.interbroker_security_protocol }}
+inter.broker.listener.name={{ interbroker_listener.name }}
 
 ssl.keystore.location=/mnt/security/test.keystore.jks
 ssl.keystore.password=test-ks-passwd
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py 
b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index ba014ea..a64363c 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -12,15 +12,14 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+from kafkatest.services.security.security_config import SecurityConfig
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.console_consumer import ConsoleConsumer
 from kafkatest.utils import is_int
 from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
-from ducktape.mark import parametrize, matrix
+from ducktape.mark import matrix
 from ducktape.mark.resource import cluster
 from kafkatest.services.security.kafka_acls import ACLs
 import time
@@ -66,13 +65,12 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
             time.sleep(10)
 
     def roll_in_secured_settings(self, client_protocol, broker_protocol):
-
         # Roll cluster to include inter broker security protocol.
-        self.kafka.interbroker_security_protocol = broker_protocol
+        self.kafka.setup_interbroker_listener(broker_protocol)
         self.bounce()
 
         # Roll cluster to disable PLAINTEXT port
-        self.kafka.close_port('PLAINTEXT')
+        self.kafka.close_port(SecurityConfig.PLAINTEXT)
         self.set_authorizer_and_bounce(client_protocol, broker_protocol)
 
     def set_authorizer_and_bounce(self, client_protocol, broker_protocol):
@@ -100,17 +98,31 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         # Bounce again with ACLs for new mechanism
         self.set_authorizer_and_bounce(security_protocol, security_protocol)
 
+    def add_separate_broker_listener(self, broker_security_protocol, 
broker_sasl_mechanism):
+        self.kafka.setup_interbroker_listener(broker_security_protocol, True)
+        self.kafka.interbroker_sasl_mechanism = broker_sasl_mechanism
+        # kafka opens interbroker port automatically in start() but not in 
bounce()
+        self.kafka.open_port(self.kafka.INTERBROKER_LISTENER_NAME)
+        self.bounce()
+
+    def remove_separate_broker_listener(self, client_security_protocol, 
client_sasl_mechanism):
+        # separate interbroker listener port will be closed automatically in 
setup_interbroker_listener
+        # if not using separate interbroker listener
+        self.kafka.setup_interbroker_listener(client_security_protocol, False)
+        self.kafka.interbroker_sasl_mechanism = client_sasl_mechanism
+        self.bounce()
+
     @cluster(num_nodes=8)
-    @matrix(client_protocol=["SSL"])
+    @matrix(client_protocol=[SecurityConfig.SSL])
     @cluster(num_nodes=9)
-    @matrix(client_protocol=["SASL_PLAINTEXT", "SASL_SSL"])
+    @matrix(client_protocol=[SecurityConfig.SASL_PLAINTEXT, 
SecurityConfig.SASL_SSL])
     def test_rolling_upgrade_phase_one(self, client_protocol):
         """
         Start with a PLAINTEXT cluster, open a SECURED port, via a rolling 
upgrade, ensuring we could produce
         and consume throughout over PLAINTEXT. Finally check we can produce 
and consume the new secured port.
         """
-        self.kafka.interbroker_security_protocol = "PLAINTEXT"
-        self.kafka.security_protocol = "PLAINTEXT"
+        self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT)
+        self.kafka.security_protocol = SecurityConfig.PLAINTEXT
         self.kafka.start()
 
         # Create PLAINTEXT producer and consumer
@@ -125,7 +137,8 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
     @cluster(num_nodes=8)
-    @matrix(client_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"], 
broker_protocol=["SASL_SSL", "SSL", "SASL_PLAINTEXT"])
+    @matrix(client_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, 
SecurityConfig.SASL_PLAINTEXT],
+            broker_protocol=[SecurityConfig.SASL_SSL, SecurityConfig.SSL, 
SecurityConfig.SASL_PLAINTEXT])
     def test_rolling_upgrade_phase_two(self, client_protocol, broker_protocol):
         """
         Start with a PLAINTEXT cluster with a second Secured port open (i.e. 
result of phase one).
@@ -137,7 +150,7 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         """
         #Given we have a broker that has both secure and PLAINTEXT ports open
         self.kafka.security_protocol = client_protocol
-        self.kafka.interbroker_security_protocol = "PLAINTEXT"
+        self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, 
use_separate_listener=False)
         self.kafka.open_port(broker_protocol)
         self.kafka.start()
 
@@ -148,16 +161,16 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(self.roll_in_secured_settings, 
client_protocol, broker_protocol)
 
     @cluster(num_nodes=9)
-    @parametrize(new_client_sasl_mechanism='PLAIN')
+    @matrix(new_client_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
     def test_rolling_upgrade_sasl_mechanism_phase_one(self, 
new_client_sasl_mechanism):
         """
         Start with a SASL/GSSAPI cluster, add new SASL mechanism, via a 
rolling upgrade, ensuring we could produce
         and consume throughout over SASL/GSSAPI. Finally check we can produce 
and consume using new mechanism.
         """
-        self.kafka.interbroker_security_protocol = "SASL_SSL"
-        self.kafka.security_protocol = "SASL_SSL"
-        self.kafka.client_sasl_mechanism = "GSSAPI"
-        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+        self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, 
use_separate_listener=False)
+        self.kafka.security_protocol = SecurityConfig.SASL_SSL
+        self.kafka.client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+        self.kafka.interbroker_sasl_mechanism = 
SecurityConfig.SASL_MECHANISM_GSSAPI
         self.kafka.start()
 
         # Create SASL/GSSAPI producer and consumer
@@ -172,7 +185,7 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         self.run_produce_consume_validate(lambda: time.sleep(1))
 
     @cluster(num_nodes=8)
-    @parametrize(new_sasl_mechanism='PLAIN')
+    @matrix(new_sasl_mechanism=[SecurityConfig.SASL_MECHANISM_PLAIN])
     def test_rolling_upgrade_sasl_mechanism_phase_two(self, 
new_sasl_mechanism):
         """
         Start with a SASL cluster with GSSAPI for inter-broker and a second 
mechanism for clients (i.e. result of phase one).
@@ -182,10 +195,10 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         Ensure the producer and consumer run throughout
         """
         #Start with a broker that has GSSAPI for inter-broker and a second 
mechanism for clients
-        self.kafka.security_protocol = "SASL_SSL"
-        self.kafka.interbroker_security_protocol = "SASL_SSL"
+        self.kafka.security_protocol = SecurityConfig.SASL_SSL
+        self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, 
use_separate_listener=False)
         self.kafka.client_sasl_mechanism = new_sasl_mechanism
-        self.kafka.interbroker_sasl_mechanism = "GSSAPI"
+        self.kafka.interbroker_sasl_mechanism = 
SecurityConfig.SASL_MECHANISM_GSSAPI
         self.kafka.start()
 
         #Create Producer and Consumer using second mechanism
@@ -194,3 +207,44 @@ class 
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
         #Roll in the second SASL mechanism for inter-broker, disabling first 
mechanism. Ensure we can produce and consume throughout
         self.run_produce_consume_validate(self.roll_in_sasl_mechanism, 
self.kafka.security_protocol, new_sasl_mechanism)
 
+    @cluster(num_nodes=9)
+    def test_enable_separate_interbroker_listener(self):
+        """
+        Start with a cluster that has a single PLAINTEXT listener.
+        Start producing/consuming on PLAINTEXT port.
+        While doing that, do a rolling restart to enable separate secured 
interbroker port
+        """
+        self.kafka.security_protocol = SecurityConfig.PLAINTEXT
+        self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT, 
use_separate_listener=False)
+
+        self.kafka.start()
+
+        self.create_producer_and_consumer()
+
+        self.run_produce_consume_validate(self.add_separate_broker_listener, 
SecurityConfig.SASL_SSL,
+                                          SecurityConfig.SASL_MECHANISM_PLAIN)
+
+    @cluster(num_nodes=9)
+    def test_disable_separate_interbroker_listener(self):
+        """
+        Start with a cluster that has two listeners, one on SSL (clients), 
another on SASL_SSL (broker-to-broker).
+        Start producer and consumer on SSL listener.
+        Close dedicated interbroker listener via rolling restart.
+        Ensure we can produce and consume via SSL listener throughout.
+        """
+        client_protocol = SecurityConfig.SSL
+        client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+
+        self.kafka.security_protocol = client_protocol
+        self.kafka.client_sasl_mechanism = client_sasl_mechanism
+        self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL, 
use_separate_listener=True)
+        self.kafka.interbroker_sasl_mechanism = 
SecurityConfig.SASL_MECHANISM_GSSAPI
+
+        self.kafka.start()
+        # create producer and consumer via client security protocol
+        self.create_producer_and_consumer()
+
+        # run produce/consume/validate loop while disabling a separate 
interbroker listener via rolling restart
+        self.run_produce_consume_validate(
+            self.remove_separate_broker_listener, client_protocol, 
client_sasl_mechanism)
+

Reply via email to