Repository: ambari
Updated Branches:
  refs/heads/branch-2.4 e2864ca26 -> 9a606ef2f


AMBARI-17479. authorizer.class.name not being set on secure kafka clusters 
(rlevas)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/9a606ef2
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/9a606ef2
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/9a606ef2

Branch: refs/heads/branch-2.4
Commit: 9a606ef2fd356fa611d7f83dec7537b263e36964
Parents: e2864ca
Author: Robert Levas <rle...@hortonworks.com>
Authored: Thu Jul 7 12:07:10 2016 -0400
Committer: Robert Levas <rle...@hortonworks.com>
Committed: Thu Jul 7 12:07:16 2016 -0400

----------------------------------------------------------------------
 .../stacks/HDP/2.0.6/services/stack_advisor.py  |  14 +++
 .../stacks/HDP/2.3/services/stack_advisor.py    | 118 ++++++++++++-------
 .../stacks/2.3/common/test_stack_advisor.py     |  17 ++-
 3 files changed, 106 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/9a606ef2/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
index 8358438..21594d1 100644
--- 
a/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
+++ 
b/ambari-server/src/main/resources/stacks/HDP/2.0.6/services/stack_advisor.py
@@ -1649,6 +1649,20 @@ def getOldValue(self, services, configType, 
propertyName):
   return None
 
 # Validation helper methods
+def isSecurityEnabled(services):
+  """
+  Determines if security is enabled by testing the value of 
cluster-env/security enabled.
+
+  If the property exists and is equal to "true", then is it enabled; otherwise 
is it assumed to be
+  disabled.
+
+  :param services: the services structure containing the current configurations
+  :return: true if security is enabled; otherwise false
+  """
+  return "cluster-env" in services["configurations"] \
+         and "security_enabled" in 
services["configurations"]["cluster-env"]["properties"] \
+         and 
services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower()
 == "true"
+
 def getSiteProperties(configurations, siteName):
   siteConfig = configurations.get(siteName)
   if siteConfig is None:

http://git-wip-us.apache.org/repos/asf/ambari/blob/9a606ef2/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py 
b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
index 879008b..858fe16 100644
--- a/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/HDP/2.3/services/stack_advisor.py
@@ -293,27 +293,85 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
       
putHdfsSitePropertyAttribute('dfs.namenode.inode.attributes.provider.class', 
'delete', 'true')
 
   def recommendKAFKAConfigurations(self, configurations, clusterData, 
services, hosts):
+
+    servicesList = [service["StackServices"]["service_name"] for service in 
services["services"]]
     kafka_broker = getServicesSiteProperties(services, "kafka-broker")
 
-    # kerberos security for kafka is decided from 
`security.inter.broker.protocol` property value
-    security_enabled = (kafka_broker is not None and 
'security.inter.broker.protocol' in  kafka_broker
-                        and 'SASL' in 
kafka_broker['security.inter.broker.protocol'])
+    security_enabled = isSecurityEnabled(services)
+
     putKafkaBrokerProperty = self.putProperty(configurations, "kafka-broker", 
services)
     putKafkaLog4jProperty = self.putProperty(configurations, "kafka-log4j", 
services)
     putKafkaBrokerAttributes = self.putPropertyAttribute(configurations, 
"kafka-broker")
 
+    if security_enabled:
+      kafka_env = getServicesSiteProperties(services, "kafka-env")
+      kafka_user = kafka_env.get('kafka_user') if kafka_env is not None else 
None
+
+      if kafka_user is not None:
+        kafka_super_users = kafka_broker.get('super.users') if kafka_broker is 
not None else None
+
+        # kafka_super_super_users is expected to be formatted as:  
User:user1;User:user2
+        if kafka_super_users is not None and kafka_super_users != '':
+          # Parse kafka_super_users to get a set of unique user names and 
rebuild the property value
+          user_names = set()
+          user_names.add(kafka_user)
+          for match in re.findall('User:([^;]*)', kafka_super_users):
+            user_names.add(match)
+          kafka_super_users = 'User:' + ";User:".join(user_names)
+        else:
+          kafka_super_users = 'User:' + kafka_user
+
+        putKafkaBrokerProperty("super.users", kafka_super_users)
+
+      putKafkaBrokerProperty("principal.to.local.class", 
"kafka.security.auth.KerberosPrincipalToLocal")
+      putKafkaBrokerProperty("security.inter.broker.protocol", "PLAINTEXTSASL")
+      putKafkaBrokerProperty("zookeeper.set.acl", "true")
+
+    else:  # not security_enabled
+      # remove unneeded properties
+      putKafkaBrokerAttributes('super.users', 'delete', 'true')
+      putKafkaBrokerAttributes('principal.to.local.class', 'delete', 'true')
+      putKafkaBrokerAttributes('security.inter.broker.protocol', 'delete', 
'true')
+
+    # Update ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled to 
match ranger-env/ranger-kafka-plugin-enabled
+    if "ranger-env" in services["configurations"] \
+      and "ranger-kafka-plugin-properties" in services["configurations"] \
+      and "ranger-kafka-plugin-enabled" in 
services["configurations"]["ranger-env"]["properties"]:
+      putKafkaRangerPluginProperty = self.putProperty(configurations, 
"ranger-kafka-plugin-properties", services)
+      ranger_kafka_plugin_enabled = 
services["configurations"]["ranger-env"]["properties"]["ranger-kafka-plugin-enabled"]
+      putKafkaRangerPluginProperty("ranger-kafka-plugin-enabled", 
ranger_kafka_plugin_enabled)
+
+    # Determine if the Ranger/Kafka Plugin is enabled
+    ranger_plugin_enabled = "RANGER" in servicesList
+    # Only if the RANGER service is installed....
+    if ranger_plugin_enabled:
+      # If ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled,
+      # determine if the Ranger/Kafka plug-in enabled enabled or not
+      if 'ranger-kafka-plugin-properties' in configurations and \
+          'ranger-kafka-plugin-enabled' in 
configurations['ranger-kafka-plugin-properties']['properties']:
+        ranger_plugin_enabled = 
configurations['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'].lower()
 == 'yes'
+      # If ranger-kafka-plugin-properties/ranger-kafka-plugin-enabled was not 
changed,
+      # determine if the Ranger/Kafka plug-in enabled enabled or not
+      elif 'ranger-kafka-plugin-properties' in services['configurations'] and \
+          'ranger-kafka-plugin-enabled' in 
services['configurations']['ranger-kafka-plugin-properties']['properties']:
+        ranger_plugin_enabled = 
services['configurations']['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled'].lower()
 == 'yes'
+
+    # Determine the value for kafka-broker/authorizer.class.name
+    if ranger_plugin_enabled:
+      # If the Ranger plugin for Kafka is enabled, set authorizer.class.name to
+      # 
"org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer" 
whether Kerberos is
+      # enabled or not.
+      putKafkaBrokerProperty("authorizer.class.name", 
'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer')
+    elif security_enabled:
+      putKafkaBrokerProperty("authorizer.class.name", 
'kafka.security.auth.SimpleAclAuthorizer')
+    else:
+      putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true')
+
     #If AMS is part of Services, use the KafkaTimelineMetricsReporter for 
metric reporting. Default is ''.
-    servicesList = [service["StackServices"]["service_name"] for service in 
services["services"]]
     if "AMBARI_METRICS" in servicesList:
       putKafkaBrokerProperty('kafka.metrics.reporters', 
'org.apache.hadoop.metrics2.sink.kafka.KafkaTimelineMetricsReporter')
 
-    if "ranger-env" in services["configurations"] and 
"ranger-kafka-plugin-properties" in services["configurations"] and \
-        "ranger-kafka-plugin-enabled" in 
services["configurations"]["ranger-env"]["properties"]:
-      putKafkaRangerPluginProperty = self.putProperty(configurations, 
"ranger-kafka-plugin-properties", services)
-      rangerEnvKafkaPluginProperty = 
services["configurations"]["ranger-env"]["properties"]["ranger-kafka-plugin-enabled"]
-      putKafkaRangerPluginProperty("ranger-kafka-plugin-enabled", 
rangerEnvKafkaPluginProperty)
-
-    if 'ranger-kafka-plugin-properties' in services['configurations'] and 
('ranger-kafka-plugin-enabled' in 
services['configurations']['ranger-kafka-plugin-properties']['properties']):
+    if ranger_plugin_enabled:
       kafkaLog4jRangerLines = [{
         "name": "log4j.appender.rangerAppender",
         "value": "org.apache.log4j.DailyRollingFileAppender"
@@ -339,37 +397,13 @@ class HDP23StackAdvisor(HDP22StackAdvisor):
           "value": "INFO, rangerAppender"
         }]
 
-      rangerPluginEnabled=''
-      if 'ranger-kafka-plugin-properties' in configurations and 
'ranger-kafka-plugin-enabled' in  
configurations['ranger-kafka-plugin-properties']['properties']:
-        rangerPluginEnabled = 
configurations['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled']
-      elif 'ranger-kafka-plugin-properties' in services['configurations'] and 
'ranger-kafka-plugin-enabled' in 
services['configurations']['ranger-kafka-plugin-properties']['properties']:
-        rangerPluginEnabled = 
services['configurations']['ranger-kafka-plugin-properties']['properties']['ranger-kafka-plugin-enabled']
-
-      if  rangerPluginEnabled and rangerPluginEnabled.lower() == "Yes".lower():
-        # recommend authorizer.class.name
-        putKafkaBrokerProperty("authorizer.class.name", 
'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer')
-        # change kafka-log4j when ranger plugin is installed
-
-        if 'kafka-log4j' in services['configurations'] and 'content' in 
services['configurations']['kafka-log4j']['properties']:
-          kafkaLog4jContent = 
services['configurations']['kafka-log4j']['properties']['content']
-          for item in range(len(kafkaLog4jRangerLines)):
-            if kafkaLog4jRangerLines[item]["name"] not in kafkaLog4jContent:
-              kafkaLog4jContent+= '\n' + kafkaLog4jRangerLines[item]["name"] + 
'=' + kafkaLog4jRangerLines[item]["value"]
-          putKafkaLog4jProperty("content",kafkaLog4jContent)
-
-
-      else:
-        # Kerberized Cluster with Ranger plugin disabled
-        if security_enabled and 'kafka-broker' in services['configurations'] 
and 'authorizer.class.name' in 
services['configurations']['kafka-broker']['properties'] and \
-          
services['configurations']['kafka-broker']['properties']['authorizer.class.name']
 == 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer':
-          putKafkaBrokerProperty("authorizer.class.name", 
'kafka.security.auth.SimpleAclAuthorizer')
-        # Non-kerberos Cluster with Ranger plugin disabled
-        else:
-          putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true')
-
-    # Non-Kerberos Cluster without Ranger
-    elif not security_enabled:
-      putKafkaBrokerAttributes('authorizer.class.name', 'delete', 'true')
+      # change kafka-log4j when ranger plugin is installed
+      if 'kafka-log4j' in services['configurations'] and 'content' in 
services['configurations']['kafka-log4j']['properties']:
+        kafkaLog4jContent = 
services['configurations']['kafka-log4j']['properties']['content']
+        for item in range(len(kafkaLog4jRangerLines)):
+          if kafkaLog4jRangerLines[item]["name"] not in kafkaLog4jContent:
+            kafkaLog4jContent+= '\n' + kafkaLog4jRangerLines[item]["name"] + 
'=' + kafkaLog4jRangerLines[item]["value"]
+        putKafkaLog4jProperty("content",kafkaLog4jContent)
 
   def recommendRangerKMSConfigurations(self, configurations, clusterData, 
services, hosts):
     servicesList = [service["StackServices"]["service_name"] for service in 
services["services"]]

http://git-wip-us.apache.org/repos/asf/ambari/blob/9a606ef2/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py 
b/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py
index 2d98558..94ca579 100644
--- a/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.3/common/test_stack_advisor.py
@@ -294,6 +294,12 @@ class TestHDP23StackAdvisor(TestCase):
           },
           {
             "StackServices": {
+              "service_name": "RANGER",
+              "service_version": "0.5.0.2.3"
+            }
+          },
+          {
+            "StackServices": {
               "service_name": "AMBARI_METRICS"
             },
             "components": [{
@@ -318,6 +324,12 @@ class TestHDP23StackAdvisor(TestCase):
         "core-site": {
           "properties": { },
         },
+        "cluster-env": {
+          "properties": {
+            "security_enabled" : "true"
+          },
+          "property_attributes": {}
+        },
         "kafka-broker": {
           "properties": {
             "authorizer.class.name" : "kafka.security.auth.SimpleAclAuthorizer"
@@ -338,10 +350,12 @@ class TestHDP23StackAdvisor(TestCase):
     }
 
     # Test authorizer.class.name with Ranger Kafka plugin disabled in 
non-kerberos environment
+    
services['configurations']['cluster-env']['properties']['security_enabled'] = 
"false"
     self.stackAdvisor.recommendKAFKAConfigurations(configurations, 
clusterData, services, None)
     
self.assertEquals(configurations['kafka-broker']['property_attributes']['authorizer.class.name'],
 {'delete': 'true'}, "Test authorizer.class.name with Ranger Kafka plugin is 
disabled in non-kerberos environment")
 
     # Test authorizer.class.name with Ranger Kafka plugin disabled in kerberos 
environment
+    
services['configurations']['cluster-env']['properties']['security_enabled'] = 
"true"
     configurations['kafka-broker']['properties'] = {}
     configurations['kafka-broker']['property_attributes'] = {}
     
services['configurations']['kafka-broker']['properties']['security.inter.broker.protocol']
 = 'PLAINTEXTSASL'
@@ -350,6 +364,7 @@ class TestHDP23StackAdvisor(TestCase):
     
self.assertEquals(configurations['kafka-broker']['properties']['authorizer.class.name'],
 'kafka.security.auth.SimpleAclAuthorizer' , "Test authorizer.class.name with 
Ranger Kafka plugin disabled in kerberos environment")
 
     # Test authorizer.class.name with Ranger Kafka plugin enabled in 
non-kerberos environment
+    
services['configurations']['cluster-env']['properties']['security_enabled'] = 
"false"
     configurations['kafka-broker']['properties'] = {}
     configurations['kafka-broker']['property_attributes'] = {}
     del 
services['configurations']['kafka-broker']['properties']['security.inter.broker.protocol']
@@ -358,7 +373,7 @@ class TestHDP23StackAdvisor(TestCase):
     self.stackAdvisor.recommendKAFKAConfigurations(configurations, 
clusterData, services, None)
     
self.assertEquals(configurations['kafka-broker']['properties']['authorizer.class.name'],
 'org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer', 
"Test authorizer.class.name with Ranger Kafka plugin enabled in kerberos 
environment")
 
-    # Test authorizer.class.name with Ranger Kafka plugin enabled in kerberos 
environment
+    
services['configurations']['cluster-env']['properties']['security_enabled'] = 
"false"
     configurations['kafka-broker']['properties'] = {}
     configurations['kafka-broker']['property_attributes'] = {}
     
services['configurations']['kafka-broker']['properties']['security.inter.broker.protocol']
 = 'PLAINTEXTSASL'

Reply via email to