Repository: ambari Updated Branches: refs/heads/branch-2.4 e2864ca26 -> 9a606ef2f
AMBARI-17479. authorizer.class.name not being set on secure kafka clusters (rlevas) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9a606ef2 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9a606ef2 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9a606ef2 Branch: refs/heads/branch-2.4 Commit: 9a606ef2fd356fa611d7f83dec7537b263e36964 Parents: e2864ca Author: Robert Levas <rle...@hortonworks.com> Authored: Thu Jul 7 12:07:10 2016 -0400 Committer: Robert Levas <rle...@hortonworks.com> Committed: Thu Jul 7 12:07:16 2016 -0400 ---------------------------------------------------------------------- .../stacks/HDP/2.0.6/services/stack_advisor.py | 14 +++ .../stacks/HDP/2.3/services/stack_advisor.py | 118 ++++++++++++------- .../stacks/2.3/common/test_stack_advisor.py | 17 ++- 3 files changed, 106 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/9a606ef2/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py index 8358438..21594d1 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py @@ -1649,6 +1649,20 @@ def getOldValue(self, services, configType, propertyName): return None # Validation helper methods +def isSecurityEnabled(services): + """ + Determines if security is enabled by testing the value of cluster-env/security enabled. + + If the property exists and is equal to "true", then is it enabled; otherwise is it assumed to be + disabled. + + :param services: the services structure containing the current configurations + :return: true if security is enabled; otherwise false + """ + return "cluster-env" in services["configurations"] \ + and "security_enabled" in services["configurations"]["cluster-env"]["properties"] \ + and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true" + def getSiteProperties(configurations, siteName): siteConfig = configurations.get(siteName) if siteConfig is None: http://git-wip-us.apache.org/repos/asf/ambari/blob/9a606ef2/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py index 879008b..858fe16 100644 --- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py +++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py @@ -293,27 +293,85 @@ class HDP23StackAdvisor(HDP22StackAdvisor): putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 'delete', 'true') def recommendKAFKAConfigurations(self, configurations, clusterData, services, hosts): + + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] kafka_broker = getServicesSiteProperties(services, "kafka-broker") - # kerberos security for kafka is decided from `security.inter.broker.protocol` property value - security_enabled = (kafka_broker is not None and 'security.inter.broker.protocol' in kafka_broker - and 'SASL' in kafka_broker['security.inter.broker.protocol']) + security_enabled = isSecurityEnabled(services) + putKafkaBrokerProperty = self.putProperty(configurations, "kafka-broker", services) putKafkaLog4jProperty = self.putProperty(configurations, "kafka-log4j", services) putKafkaBrokerAttributes = self.putPropertyAttribute(configurations, "kafka-broker") + if security_enabled: + kafka_env = getServicesSiteProperties(services, "kafka-env") + kafka_user = kafka_env.get('kafka_user') if kafka_env is not None else None + + if kafka_user is not None: + kafka_super_users = kafka_broker.get('super.users') if kafka_broker is not None else None + + # kafka_super_super_users is expected to be formatted as: User:user1;User:user2 + if kafka_super_users is not None and kafka_super_users != '': + # Parse kafka_super_users to get a set of unique user names and rebuild the property value + user_names = set() + user_names.add(kafka_user) + for match in re.findall('User:([^;]*)', kafka_super_users): + user_names.add(match) + kafka_super_users = 'User:' + ";User:".join(user_names) + else: + kafka_super_users = 'User:' + kafka_user + + putKafkaBrokerProperty("super.users", kafka_super_users) + + putKafkaBrokerProperty("principal.to.local.class", "kafka.security.auth.KerberosPrincipalToLocal") + putKafkaBrokerProperty("security.inter.broker.protocol", "PLAINTEXTSASL") + putKafkaBrokerProperty("zookeeper.set.acl", "true") + + else: # not security_enabled + # remove unneeded properties + putKafkaBrokerAttributes('super.users', 'delete', 'true') + putKafkaBrokerAttributes('principal.to.local.class', 'delete', 'true') + putKafkaBrokerAttributes('security.inter.broker.protocol', 'delete', 'true') + + # Update ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled to match ranger-env/ranger-kafka-plugin-enabled + if "ranger-env" in services["configurations"] \ + and "ranger-kafka-plugin-properties" in services["configurations"] \ + and "ranger-kafka-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + putKafkaRangerPluginProperty = self.putProperty(configurations, "ranger-kafka-plugin-properties", services) + ranger_kafka_plugin_enabled = services["configurations"]["ranger-env"]["properties"]["ranger-kafka-plugin-enabled"] + putKafkaRangerPluginProperty("ranger-kafka-plugin-enabled", ranger_kafka_plugin_enabled) + + # Determine if the Ranger/Kafka Plugin is enabled + ranger_plugin_enabled = "RANGER" in servicesList + # Only if the RANGER service is installed.... + if ranger_plugin_enabled: + # If ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled, + # determine if the Ranger/Kafka plug-in enabled enabled or not + if 'ranger-kafka-plugin-properties' in configurations and \ + 'ranger-kafka-plugin-enabled' in configurations['ranger-kafka-plugin-properties']['properties']: + ranger_plugin_enabled = configurations['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'].lower() == 'yes' + # If ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled was not changed, + # determine if the Ranger/Kafka plug-in enabled enabled or not + elif 'ranger-kafka-plugin-properties' in services['configurations'] and \ + 'ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']: + ranger_plugin_enabled = services['configurations']['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'].lower() == 'yes' + + # Determine the value for kafka-broker/authorizer.class.name + if ranger_plugin_enabled: + # If the Ranger plugin for Kafka is enabled, set authorizer.class.name to + # "org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer" whether Kerberos is + # enabled or not. + putKafkaBrokerProperty("authorizer.class.name", 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer') + elif security_enabled: + putKafkaBrokerProperty("authorizer.class.name", 'kafka.security.auth.SimpleAclAuthorizer') + else: + putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true') + #If AMS is part of Services, use the KafkaTimelineMetricsReporter for metric reporting. Default is ''. - servicesList = [service["StackServices"]["service_name"] for service in services["services"]] if "AMBARI_METRICS" in servicesList: putKafkaBrokerProperty('kafka.metrics.reporters', 'org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter') - if "ranger-env" in services["configurations"] and "ranger-kafka-plugin-properties" in services["configurations"] and \ - "ranger-kafka-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: - putKafkaRangerPluginProperty = self.putProperty(configurations, "ranger-kafka-plugin-properties", services) - rangerEnvKafkaPluginProperty = services["configurations"]["ranger-env"]["properties"]["ranger-kafka-plugin-enabled"] - putKafkaRangerPluginProperty("ranger-kafka-plugin-enabled", rangerEnvKafkaPluginProperty) - - if 'ranger-kafka-plugin-properties' in services['configurations'] and ('ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']): + if ranger_plugin_enabled: kafkaLog4jRangerLines = [{ "name": "log4j.appender.rangerAppender", "value": "org.apache.log4j.DailyRollingFileAppender" @@ -339,37 +397,13 @@ class HDP23StackAdvisor(HDP22StackAdvisor): "value": "INFO, rangerAppender" }] - rangerPluginEnabled='' - if 'ranger-kafka-plugin-properties' in configurations and 'ranger-kafka-plugin-enabled' in configurations['ranger-kafka-plugin-properties']['properties']: - rangerPluginEnabled = configurations['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'] - elif 'ranger-kafka-plugin-properties' in services['configurations'] and 'ranger-kafka-plugin-enabled' in services['configurations']['ranger-kafka-plugin-properties']['properties']: - rangerPluginEnabled = services['configurations']['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'] - - if rangerPluginEnabled and rangerPluginEnabled.lower() == "Yes".lower(): - # recommend authorizer.class.name - putKafkaBrokerProperty("authorizer.class.name", 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer') - # change kafka-log4j when ranger plugin is installed - - if 'kafka-log4j' in services['configurations'] and 'content' in services['configurations']['kafka-log4j']['properties']: - kafkaLog4jContent = services['configurations']['kafka-log4j']['properties']['content'] - for item in range(len(kafkaLog4jRangerLines)): - if kafkaLog4jRangerLines[item]["name"] not in kafkaLog4jContent: - kafkaLog4jContent+= '\n' + kafkaLog4jRangerLines[item]["name"] + '=' + kafkaLog4jRangerLines[item]["value"] - putKafkaLog4jProperty("content",kafkaLog4jContent) - - - else: - # Kerberized Cluster with Ranger plugin disabled - if security_enabled and 'kafka-broker' in services['configurations'] and 'authorizer.class.name' in services['configurations']['kafka-broker']['properties'] and \ - services['configurations']['kafka-broker']['properties']['authorizer.class.name'] == 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer': - putKafkaBrokerProperty("authorizer.class.name", 'kafka.security.auth.SimpleAclAuthorizer') - # Non-kerberos Cluster with Ranger plugin disabled - else: - putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true') - - # Non-Kerberos Cluster without Ranger - elif not security_enabled: - putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true') + # change kafka-log4j when ranger plugin is installed + if 'kafka-log4j' in services['configurations'] and 'content' in services['configurations']['kafka-log4j']['properties']: + kafkaLog4jContent = services['configurations']['kafka-log4j']['properties']['content'] + for item in range(len(kafkaLog4jRangerLines)): + if kafkaLog4jRangerLines[item]["name"] not in kafkaLog4jContent: + kafkaLog4jContent+= '\n' + kafkaLog4jRangerLines[item]["name"] + '=' + kafkaLog4jRangerLines[item]["value"] + putKafkaLog4jProperty("content",kafkaLog4jContent) def recommendRangerKMSConfigurations(self, configurations, clusterData, services, hosts): servicesList = [service["StackServices"]["service_name"] for service in services["services"]] http://git-wip-us.apache.org/repos/asf/ambari/blob/9a606ef2/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py b/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py index 2d98558..94ca579 100644 --- a/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py +++ b/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py @@ -294,6 +294,12 @@ class TestHDP23StackAdvisor(TestCase): }, { "StackServices": { + "service_name": "RANGER", + "service_version": "0.5.0.2.3" + } + }, + { + "StackServices": { "service_name": "AMBARI_METRICS" }, "components": [{ @@ -318,6 +324,12 @@ class TestHDP23StackAdvisor(TestCase): "core-site": { "properties": { }, }, + "cluster-env": { + "properties": { + "security_enabled" : "true" + }, + "property_attributes": {} + }, "kafka-broker": { "properties": { "authorizer.class.name" : "kafka.security.auth.SimpleAclAuthorizer" @@ -338,10 +350,12 @@ class TestHDP23StackAdvisor(TestCase): } # Test authorizer.class.name with Ranger Kafka plugin disabled in non-kerberos environment + services['configurations']['cluster-env']['properties']['security_enabled'] = "false" self.stackAdvisor.recommendKAFKAConfigurations(configurations, clusterData, services, None) self.assertEquals(configurations['kafka-broker']['property_attributes']['authorizer.class.name'], {'delete': 'true'}, "Test authorizer.class.name with Ranger Kafka plugin is disabled in non-kerberos environment") # Test authorizer.class.name with Ranger Kafka plugin disabled in kerberos environment + services['configurations']['cluster-env']['properties']['security_enabled'] = "true" configurations['kafka-broker']['properties'] = {} configurations['kafka-broker']['property_attributes'] = {} services['configurations']['kafka-broker']['properties']['security.inter.broker.protocol'] = 'PLAINTEXTSASL' @@ -350,6 +364,7 @@ class TestHDP23StackAdvisor(TestCase): self.assertEquals(configurations['kafka-broker']['properties']['authorizer.class.name'], 'kafka.security.auth.SimpleAclAuthorizer' , "Test authorizer.class.name with Ranger Kafka plugin disabled in kerberos environment") # Test authorizer.class.name with Ranger Kafka plugin enabled in non-kerberos environment + services['configurations']['cluster-env']['properties']['security_enabled'] = "false" configurations['kafka-broker']['properties'] = {} configurations['kafka-broker']['property_attributes'] = {} del services['configurations']['kafka-broker']['properties']['security.inter.broker.protocol'] @@ -358,7 +373,7 @@ class TestHDP23StackAdvisor(TestCase): self.stackAdvisor.recommendKAFKAConfigurations(configurations, clusterData, services, None) self.assertEquals(configurations['kafka-broker']['properties']['authorizer.class.name'], 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer', "Test authorizer.class.name with Ranger Kafka plugin enabled in kerberos environment") - # Test authorizer.class.name with Ranger Kafka plugin enabled in kerberos environment + services['configurations']['cluster-env']['properties']['security_enabled'] = "false" configurations['kafka-broker']['properties'] = {} configurations['kafka-broker']['property_attributes'] = {} services['configurations']['kafka-broker']['properties']['security.inter.broker.protocol'] = 'PLAINTEXTSASL'