This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ebef7d0 MINOR: TestSecurityRollingUpgrade system test fixes (#10886)
ebef7d0 is described below
commit ebef7d0c216ab6b86b5743bb29538dfa27224be4
Author: Ron Dagostino <[email protected]>
AuthorDate: Fri Jun 18 03:50:21 2021 -0400
MINOR: TestSecurityRollingUpgrade system test fixes (#10886)
The TestSecurityRollingUpgrade.
test_disable_separate_interbroker_listener() system test had a design flaw: it
was migrating inter-broker communication from a SASL_SSL listener to an SSL
listener in one roll while immediately removing the SASL_SSL listener in that
roll. This requires two rolls because the existing SASL_SSL listener must
remain available throughout the first roll so that unrolled brokers can
continue to communicate with rolled brokers throughout. This patch adds the se
[...]
The TestSecurityRollingUpgrade.test_rolling_upgrade_phase_two() system test
was not explicitly identifying the SASL mechanism to enable on a third port
when that port was using SASL but the client security protocol was not
SASL-based. This was resulting in an empty sasl.enabled.mechanisms config,
which applied to that third port, and then when the cluster was rolled to take
advantage of this third port for inter-broker communication the potential for
an inability to communicate with o [...]
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../tests/core/security_rolling_upgrade_test.py | 32 +++++++++++++++-------
1 file changed, 22 insertions(+), 10 deletions(-)
diff --git a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
index fb8812e..aa60878 100644
--- a/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
+++ b/tests/kafkatest/tests/core/security_rolling_upgrade_test.py
@@ -111,11 +111,18 @@ class
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.kafka.interbroker_sasl_mechanism = broker_sasl_mechanism
self.bounce()
- def remove_separate_broker_listener(self, client_security_protocol,
client_sasl_mechanism):
- # separate interbroker listener port will be closed automatically in
setup_interbroker_listener
- # if not using separate interbroker listener
- self.kafka.setup_interbroker_listener(client_security_protocol, False)
- self.kafka.interbroker_sasl_mechanism = client_sasl_mechanism
+ def remove_separate_broker_listener(self, client_security_protocol):
+ # This must be done in two phases: keep listening on the INTERNAL
listener while rolling once to switch
+ # the inter-broker security listener, then roll again to remove the
INTERNAL listener.
+ orig_inter_broker_security_protocol =
self.kafka.interbroker_security_protocol
+ self.kafka.setup_interbroker_listener(client_security_protocol, False)
# this closes the INTERNAL listener
+ # Re-open the INTERNAL listener
+ self.kafka.open_port(KafkaService.INTERBROKER_LISTENER_NAME)
+
self.kafka.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME].security_protocol
= orig_inter_broker_security_protocol
+
self.kafka.port_mappings[KafkaService.INTERBROKER_LISTENER_NAME].sasl_mechanism
= self.kafka.interbroker_sasl_mechanism
+ self.bounce()
+ # Close the INTERNAL listener for good and bounce again to fully
migrate to <client_security_protocol>
+ self.kafka.close_port(KafkaService.INTERBROKER_LISTENER_NAME)
self.bounce()
@cluster(num_nodes=8)
@@ -158,6 +165,11 @@ class
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
self.kafka.security_protocol = client_protocol
self.kafka.setup_interbroker_listener(SecurityConfig.PLAINTEXT,
use_separate_listener=False)
self.kafka.open_port(broker_protocol)
+ # Set any SASL mechanism explicitly when it isn't already set by the
client protocol
+ is_broker_protocol_sasl = broker_protocol in [SecurityConfig.SASL_SSL,
SecurityConfig.SASL_PLAINTEXT]
+ is_client_protocol_sasl = client_protocol in [SecurityConfig.SASL_SSL,
SecurityConfig.SASL_PLAINTEXT]
+ if is_broker_protocol_sasl and not is_client_protocol_sasl:
+ self.kafka.port_mappings[broker_protocol].sasl_mechanism =
SecurityConfig.SASL_MECHANISM_GSSAPI
self.kafka.start()
#Create Secured Producer and Consumer
@@ -239,12 +251,12 @@ class
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
Ensure we can produce and consume via SSL listener throughout.
"""
client_protocol = SecurityConfig.SSL
- client_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
+ interbroker_security_protocol = SecurityConfig.SASL_SSL
+ interbroker_sasl_mechanism = SecurityConfig.SASL_MECHANISM_GSSAPI
self.kafka.security_protocol = client_protocol
- self.kafka.client_sasl_mechanism = client_sasl_mechanism
- self.kafka.setup_interbroker_listener(SecurityConfig.SASL_SSL,
use_separate_listener=True)
- self.kafka.interbroker_sasl_mechanism =
SecurityConfig.SASL_MECHANISM_GSSAPI
+ self.kafka.setup_interbroker_listener(interbroker_security_protocol,
use_separate_listener=True)
+ self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
self.kafka.start()
# create producer and consumer via client security protocol
@@ -252,5 +264,5 @@ class
TestSecurityRollingUpgrade(ProduceConsumeValidateTest):
# run produce/consume/validate loop while disabling a separate
interbroker listener via rolling restart
self.run_produce_consume_validate(
- self.remove_separate_broker_listener, client_protocol,
client_sasl_mechanism)
+ self.remove_separate_broker_listener, client_protocol)