Repository: ambari Updated Branches: refs/heads/trunk 169dd4b63 -> 4b9a81684
AMBARI-17694 - Kafka listeners property does not show SASL_PLAINTEXT protocol when Kerberos is enabled (Anita Jebaraj via jonathanhurley) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4b9a8168 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4b9a8168 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4b9a8168 Branch: refs/heads/trunk Commit: 4b9a816846d56dc027fd3492211ec9ec057c7890 Parents: 169dd4b Author: Jonathan Hurley <[email protected]> Authored: Mon Aug 22 12:53:18 2016 -0400 Committer: Jonathan Hurley <[email protected]> Committed: Mon Aug 22 12:53:18 2016 -0400 ---------------------------------------------------------------------- .../kerberos/VariableReplacementHelper.java | 34 ++++++++++- .../server/upgrade/UpgradeCatalog240.java | 25 ++++++++ .../common-services/KAFKA/0.10.0/kerberos.json | 3 +- .../KAFKA/0.8.1/package/scripts/kafka.py | 15 ++--- .../common-services/KAFKA/0.9.0/kerberos.json | 3 +- .../stacks/HDP/2.3/services/stack_advisor.py | 17 +++++- .../kerberos/VariableReplacementHelperTest.java | 8 ++- .../server/upgrade/UpgradeCatalog240Test.java | 61 ++++++++++++++++++++ 8 files changed, 148 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java index 66be3bf..d472b79 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelper.java @@ -20,7 +20,6 @@ package org.apache.ambari.server.state.kerberos; import com.google.inject.Singleton; import org.apache.ambari.server.AmbariException; - import java.util.HashMap; import java.util.Map; import java.util.regex.Matcher; @@ -49,6 +48,7 @@ public class VariableReplacementHelper { { put("each", new EachFunction()); put("toLower", new ToLowerFunction()); + put("replace", new ReplaceValue()); } }; @@ -226,7 +226,37 @@ public class VariableReplacementHelper { return ""; } } - + /** + * ReplaceValue is a Function implementation that replaces the value in the string + * <p/> + * This function expects the following arguments (in order) within the args array: + * <ol> + * <li>regular expression that should be replaced</li> + * <li>replacement value for the string</li> + * </ol> + */ + private static class ReplaceValue implements Function { + + @Override + public String perform(String[] args, String data) { + if ((args == null) || (args.length != 2)) { + throw new IllegalArgumentException("Invalid number of arguments encountered"); + } + if (data != null) { + StringBuffer builder = new StringBuffer(); + String regex = args[0]; + String replacement = args[1]; + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(data); + while(matcher.find()) { + matcher.appendReplacement(builder, replacement); + } + matcher.appendTail(builder); + return builder.toString(); + } + return ""; + } + } /** * ToLowerFunction is a Function implementation that converts a String to lowercase */ http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java index 12553a5..5cd8685 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/upgrade/UpgradeCatalog240.java @@ -191,6 +191,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { protected static final String EXTENSION_ID_COLUMN = "extension_id"; protected static final String EXTENSION_LINK_TABLE = "extensionlink"; protected static final String EXTENSION_LINK_ID_COLUMN = "link_id"; + protected static final String KAFKA_BROKER_CONFIG = "kafka-broker"; private static final Map<String, Integer> ROLE_ORDER; private static final String AMS_HBASE_SITE = "ams-hbase-site"; @@ -390,6 +391,7 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { addManageUserPersistedDataPermission(); allowClusterOperatorToManageCredentials(); updateHDFSConfigs(); + updateKAFKAConfigs(); updateHIVEConfigs(); updateAMSConfigs(); updateClusterEnv(); @@ -1932,7 +1934,30 @@ public class UpgradeCatalog240 extends AbstractUpgradeCatalog { } } } + + protected void updateKAFKAConfigs() throws AmbariException { + AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); + Clusters clusters = ambariManagementController.getClusters(); + if (clusters != null) { + Map<String, Cluster> clusterMap = getCheckedClusterMap(clusters); + if (clusterMap != null && !clusterMap.isEmpty()) { + for (final Cluster cluster : clusterMap.values()) { + Set<String> installedServices = cluster.getServices().keySet(); + if (installedServices.contains("KAFKA") && cluster.getSecurityType() == SecurityType.KERBEROS) { + Config kafkaBroker = cluster.getDesiredConfigByType(KAFKA_BROKER_CONFIG); + if (kafkaBroker != null) { + String listenersPropertyValue = kafkaBroker.getProperties().get("listeners"); + if (StringUtils.isNotEmpty(listenersPropertyValue)) { + String newListenersPropertyValue = listenersPropertyValue.replaceAll("\\bPLAINTEXT\\b", "PLAINTEXTSASL"); + updateConfigurationProperties(KAFKA_BROKER_CONFIG, Collections.singletonMap("listeners", newListenersPropertyValue), true, false); + } + } + } + } + } + } + } protected void updateHIVEConfigs() throws AmbariException { AmbariManagementController ambariManagementController = injector.getInstance(AmbariManagementController.class); http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/resources/common-services/KAFKA/0.10.0/kerberos.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0/kerberos.json b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0/kerberos.json index e1e6461..1f02092 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.10.0/kerberos.json +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.10.0/kerberos.json @@ -14,7 +14,8 @@ "principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal", "super.users": "user:${kafka-env/kafka_user}", "security.inter.broker.protocol": "PLAINTEXTSASL", - "zookeeper.set.acl": "true" + "zookeeper.set.acl": "true", + "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}" } }, { http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py index 9066ab5..6cc85f4 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.8.1/package/scripts/kafka.py @@ -80,21 +80,16 @@ def kafka(upgrade_type=None): listeners = kafka_server_config['listeners'].replace("localhost", params.hostname) Logger.info(format("Kafka listeners: {listeners}")) + kafka_server_config['listeners'] = listeners if params.security_enabled and params.kafka_kerberos_enabled: Logger.info("Kafka kerberos security is enabled.") - if "SASL" not in listeners: - listeners = listeners.replace("PLAINTEXT", "PLAINTEXTSASL") - - kafka_server_config['listeners'] = listeners kafka_server_config['advertised.listeners'] = listeners Logger.info(format("Kafka advertised listeners: {listeners}")) - else: - kafka_server_config['listeners'] = listeners - if 'advertised.listeners' in kafka_server_config: - advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname) - kafka_server_config['advertised.listeners'] = advertised_listeners - Logger.info(format("Kafka advertised listeners: {advertised_listeners}")) + elif 'advertised.listeners' in kafka_server_config: + advertised_listeners = kafka_server_config['advertised.listeners'].replace("localhost", params.hostname) + kafka_server_config['advertised.listeners'] = advertised_listeners + Logger.info(format("Kafka advertised listeners: {advertised_listeners}")) else: kafka_server_config['host.name'] = params.hostname http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json index 2b1c01b..ab1ed1f 100644 --- a/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json +++ b/ambari-server/src/main/resources/common-services/KAFKA/0.9.0/kerberos.json @@ -14,7 +14,8 @@ "principal.to.local.class":"kafka.security.auth.KerberosPrincipalToLocal", "super.users": "user:${kafka-env/kafka_user}", "security.inter.broker.protocol": "PLAINTEXTSASL", - "zookeeper.set.acl": "true" + "zookeeper.set.acl": "true", + "listeners": "${kafka-broker/listeners|replace(\\bPLAINTEXT\\b, PLAINTEXTSASL)}" } } ], http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py index 64e8e03..ee96cf8 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py @@ -914,13 +914,13 @@ class HDP23StackAdvisor(HDP22StackAdvisor): def validateKAFKAConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): kafka_broker = properties validationItems = [] - + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + #Adding Ranger Plugin logic here ranger_plugin_properties = getSiteProperties(configurations, "ranger-kafka-plugin-properties") - ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] + ranger_plugin_enabled = ranger_plugin_properties['ranger-kafka-plugin-enabled'] if ranger_plugin_properties else 'No' prop_name = 'authorizer.class.name' prop_val = "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer" - servicesList = [service["StackServices"]["service_name"] for service in services["services"]] if ("RANGER" in servicesList) and (ranger_plugin_enabled.lower() == 'Yes'.lower()): if kafka_broker[prop_name] != prop_val: validationItems.append({"config-name": prop_name, @@ -928,6 +928,17 @@ class HDP23StackAdvisor(HDP22StackAdvisor): "If Ranger Kafka Plugin is enabled."\ "{0} needs to be set to {1}".format(prop_name,prop_val))}) + if 'KERBEROS' in servicesList and 'security.inter.broker.protocol' in properties: + interBrokerValue = properties['security.inter.broker.protocol'] + prop_name = 'listeners' + prop_value = properties[prop_name] + if interBrokerValue and interBrokerValue not in prop_value: + validationItems.append({"config-name": "listeners", + "item": self.getWarnItem("If kerberos is enabled "\ + "{0} need to contain {1} as one of "\ + "the protocol".format(prop_name, interBrokerValue))}) + + return self.toConfigurationValidationProblems(validationItems, "kafka-broker") def validateYARNConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java b/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java index ee2a671..8be0eb9 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/state/kerberos/VariableReplacementHelperTest.java @@ -158,6 +158,10 @@ public class VariableReplacementHelperTest { put("realm", "UNIT.TEST"); }}); + put("kafka-broker", new HashMap<String, String>() {{ + put("listeners", "PLAINTEXT://localhost:6667"); + }}); + put("clusterHostInfo", new HashMap<String, String>() {{ put("hive_metastore_host", "host1.unit.test, host2.unit.test , host3.unit.test"); // spaces are there on purpose. }}); @@ -171,6 +175,8 @@ public class VariableReplacementHelperTest { helper.replaceVariables("hive.metastore.local=false,hive.metastore.uris=${clusterHostInfo/hive_metastore_host | each(thrift://%s:9083, \\\\,, \\s*\\,\\s*)},hive.metastore.sasl.enabled=true,hive.metastore.execute.setugi=true,hive.metastore.warehouse.dir=/apps/hive/warehouse,hive.exec.mode.local.auto=false,hive.metastore.kerberos.principal=hive/_HOST@${realm}", configurations)); Assert.assertEquals("test=unit.test", helper.replaceVariables("test=${realm|toLower()}", configurations)); + + Assert.assertEquals("PLAINTEXTSASL://localhost:6667", helper.replaceVariables("${kafka-broker/listeners|replace(\\bPLAINTEXT\\b,PLAINTEXTSASL)}", configurations)); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/ambari/blob/4b9a8168/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java index 854ce7d..099af7e 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/upgrade/UpgradeCatalog240Test.java @@ -585,6 +585,7 @@ public class UpgradeCatalog240Test { Method updateRecoveryConfigurationDML = UpgradeCatalog240.class.getDeclaredMethod("updateRecoveryConfigurationDML"); Method removeAtlasMetaserverAlert = UpgradeCatalog240.class.getDeclaredMethod("removeAtlasMetaserverAlert"); Method updateRangerHbasePluginProperties = UpgradeCatalog240.class.getDeclaredMethod("updateRangerHbasePluginProperties"); + Method updateKAFKAConfigs = UpgradeCatalog240.class.getDeclaredMethod("updateKAFKAConfigs"); Capture<String> capturedStatements = newCapture(CaptureType.ALL); @@ -634,6 +635,7 @@ public class UpgradeCatalog240Test { .addMockedMethod(updateRecoveryConfigurationDML) .addMockedMethod(removeAtlasMetaserverAlert) .addMockedMethod(updateRangerHbasePluginProperties) + .addMockedMethod(updateKAFKAConfigs) .createMock(); Field field = AbstractUpgradeCatalog.class.getDeclaredField("dbAccessor"); @@ -675,6 +677,7 @@ public class UpgradeCatalog240Test { upgradeCatalog240.removeAtlasMetaserverAlert(); upgradeCatalog240.updateRangerHbasePluginProperties(); upgradeCatalog240.adjustHiveJobTimestamps(); + upgradeCatalog240.updateKAFKAConfigs(); replay(upgradeCatalog240, dbAccessor); @@ -1160,6 +1163,64 @@ public class UpgradeCatalog240Test { } @Test + public void testUpdateKAFKAConfigs() throws Exception{ + EasyMockSupport easyMockSupport = new EasyMockSupport(); + final AmbariManagementController mockAmbariManagementController = easyMockSupport.createNiceMock(AmbariManagementController.class); + final Clusters mockClusters = easyMockSupport.createStrictMock(Clusters.class); + final Cluster mockClusterExpected = easyMockSupport.createNiceMock(Cluster.class); + + final Config kafkaBroker = easyMockSupport.createNiceMock(Config.class); + expect(kafkaBroker.getProperties()).andReturn(new HashMap<String, String>(){{ + put("listeners", "PLAINTEXT://localhost:6667,SSL://localhost:6666"); + }} + ).anyTimes(); + + final Injector mockInjector = Guice.createInjector(new AbstractModule() { + @Override + protected void configure() { + bind(AmbariManagementController.class).toInstance(mockAmbariManagementController); + bind(Clusters.class).toInstance(mockClusters); + bind(EntityManager.class).toInstance(entityManager); + bind(DBAccessor.class).toInstance(createNiceMock(DBAccessor.class)); + bind(OsFamily.class).toInstance(createNiceMock(OsFamily.class)); + bind(PasswordEncoder.class).toInstance(createNiceMock(PasswordEncoder.class)); + } + }); + + expect(mockAmbariManagementController.getClusters()).andReturn(mockClusters).once(); + expect(mockClusters.getClusters()).andReturn(new HashMap<String, Cluster>() {{ + put("normal", mockClusterExpected); + }}).atLeastOnce(); + expect(mockClusterExpected.getDesiredConfigByType("kafka-broker")).andReturn(kafkaBroker).atLeastOnce(); + expect(mockClusterExpected.getSecurityType()).andReturn(SecurityType.KERBEROS); + expect(mockClusterExpected.getServices()).andReturn(new HashMap<String, Service>() { + { + put("KAFKA", null); + } + }).atLeastOnce(); + + UpgradeCatalog240 upgradeCatalog240 = createMockBuilder(UpgradeCatalog240.class) + .withConstructor(Injector.class) + .withArgs(mockInjector) + .addMockedMethod("updateConfigurationProperties", String.class, + Map.class, boolean.class, boolean.class) + .createMock(); + + Map<String, String> expectedUpdates = new HashMap<>(); + expectedUpdates.put("listeners", "PLAINTEXTSASL://localhost:6667,SSL://localhost:6666"); + + upgradeCatalog240.updateConfigurationProperties("kafka-broker", expectedUpdates, + true, false); + expectLastCall().once(); + + easyMockSupport.replayAll(); + replay(upgradeCatalog240); + upgradeCatalog240.updateKAFKAConfigs(); + easyMockSupport.verifyAll(); + } + + + @Test public void testSparkConfigUpdate() throws Exception{ Map<String, String> oldPropertiesSparkDefaults = new HashMap<String, String>() {
