Repository: ambari
Updated Branches:
  refs/heads/trunk 5bdd6cf7e -> 326cc1b2a


http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py 
b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
index 4e4ef51..c7d9327 100644
--- 
a/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
+++ 
b/ambari-server/src/main/resources/stacks/HDPWIN/2.1/services/stack_advisor.py
@@ -37,6 +37,7 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor):
     self.modifyHeapSizeProperties()
     self.modifyNotValuableComponents()
     self.modifyComponentsNotPreferableOnServer()
+    self.modifyComponentLayoutSchemes()
 
   def modifyMastersWithMultipleInstances(self):
     """
@@ -79,6 +80,32 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor):
     """
     self.notPreferableOnServerComponents |= set(['STORM_UI_SERVER', 
'DRPC_SERVER', 'STORM_REST_API', 'NIMBUS'])
 
+  def modifyComponentLayoutSchemes(self):
+    """
+    Modify layout scheme dictionaries for components.
+
+    The scheme dictionary basically maps the number of hosts to
+    host index where component should exist.
+
+    Must be overriden in child class.
+    """
+    self.componentLayoutSchemes = {
+      'NAMENODE': {"else": 0},
+      'SECONDARY_NAMENODE': {"else": 1},
+      'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
+
+      'HISTORYSERVER': {31: 1, "else": 2},
+      'RESOURCEMANAGER': {31: 1, "else": 2},
+
+      'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
+
+      'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
+      'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
+      'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
+      'APP_TIMELINE_SERVER': {31: 1, "else": 2},
+      'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
+    }
+
   def getComponentLayoutValidations(self, services, hosts):
     """Returns array of Validation objects about issues with hostnames 
components assigned to"""
     items = []
@@ -561,26 +588,6 @@ class HDPWIN21StackAdvisor(DefaultStackAdvisor):
                         {"config-name": 'hbase_master_heapsize', "item": 
self.validatorLessThenDefaultValue(properties, recommendedDefaults, 
'hbase_master_heapsize')}]
     return self.toConfigurationValidationProblems(validationItems, "hbase-env")
 
-
-  # TODO, move to Service Advisors.
-  def getComponentLayoutSchemes(self):
-    return {
-      'NAMENODE': {"else": 0},
-      'SECONDARY_NAMENODE': {"else": 1},
-      'HBASE_MASTER': {6: 0, 31: 2, "else": 3},
-
-      'HISTORYSERVER': {31: 1, "else": 2},
-      'RESOURCEMANAGER': {31: 1, "else": 2},
-
-      'OOZIE_SERVER': {6: 1, 31: 2, "else": 3},
-
-      'HIVE_SERVER': {6: 1, 31: 2, "else": 4},
-      'HIVE_METASTORE': {6: 1, 31: 2, "else": 4},
-      'WEBHCAT_SERVER': {6: 1, 31: 2, "else": 4},
-      'APP_TIMELINE_SERVER': {31: 1, "else": 2},
-      'FALCON_SERVER': {6: 1, 31: 2, "else": 3}
-      }
-
   def getHostsWithComponent(self, serviceName, componentName, services, hosts):
     if services is not None and hosts is not None and serviceName in 
[service["StackServices"]["service_name"] for service in services["services"]]:
       service = [serviceEntry for serviceEntry in services["services"] if 
serviceEntry["StackServices"]["service_name"] == serviceName][0]

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py 
b/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
index 26292d9..b72f046 100644
--- 
a/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
+++ 
b/ambari-server/src/main/resources/stacks/HDPWIN/2.2/services/stack_advisor.py
@@ -33,27 +33,6 @@ def getSiteProperties(configurations, siteName):
     return None
   return siteConfig.get("properties")
 
-def getPort(address):
-  """
-  Extracts port from the address like 0.0.0.0:1019
-  """
-  if address is None:
-    return None
-  m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
-  if m is not None:
-    return int(m.group(2))
-  else:
-    return None
-
-def isSecurePort(port):
-  """
-  Returns True if port is root-owned at *nix systems
-  """
-  if port is not None:
-    return port < 1024
-  else:
-    return False
-
 class HDPWIN22StackAdvisor(HDPWIN21StackAdvisor):
 
   def __init__(self):
@@ -822,15 +801,15 @@ class HDPWIN22StackAdvisor(HDPWIN21StackAdvisor):
       data_transfer_protection = 'dfs.data.transfer.protection'
 
       try: # Params may be absent
-        privileged_dfs_dn_port = 
isSecurePort(getPort(hdfs_site[dfs_datanode_address]))
+        privileged_dfs_dn_port = 
self.isSecurePort(self.getPort(hdfs_site[dfs_datanode_address]))
       except KeyError:
         privileged_dfs_dn_port = False
       try:
-        privileged_dfs_http_port = 
isSecurePort(getPort(hdfs_site[datanode_http_address]))
+        privileged_dfs_http_port = 
self.isSecurePort(self.getPort(hdfs_site[datanode_http_address]))
       except KeyError:
         privileged_dfs_http_port = False
       try:
-        privileged_dfs_https_port = 
isSecurePort(getPort(hdfs_site[datanode_https_address]))
+        privileged_dfs_https_port = 
self.isSecurePort(self.getPort(hdfs_site[datanode_https_address]))
       except KeyError:
         privileged_dfs_https_port = False
       try:

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/main/resources/stacks/stack_advisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/main/resources/stacks/stack_advisor.py 
b/ambari-server/src/main/resources/stacks/stack_advisor.py
index cc556c6..6a92934 100644
--- a/ambari-server/src/main/resources/stacks/stack_advisor.py
+++ b/ambari-server/src/main/resources/stacks/stack_advisor.py
@@ -23,8 +23,11 @@ import os
 import re
 import socket
 import traceback
+from math import ceil, floor
+from urlparse import urlparse
 
 # Local imports
+from resource_management.libraries.functions.data_structure_utils import 
get_from_dict
 from resource_management.core.exceptions import Fail
 from resource_management.core.logger import Logger
 
@@ -332,6 +335,7 @@ class DefaultStackAdvisor(StackAdvisor):
     self.notValuableComponents = set()
     self.notPreferableOnServerComponents = set()
     self.cardinalitiesDict = {}
+    self.componentLayoutSchemes = {}
     self.loaded_service_advisors = False
 
 
@@ -878,7 +882,99 @@ class DefaultStackAdvisor(StackAdvisor):
     return items
 
   def getConfigurationClusterSummary(self, servicesList, hosts, components, 
services):
-    return {}
+    """
+    Copied from HDP 2.0.6 so that it could be used by Service Advisors.
+    :return: Dictionary of memory and CPU attributes in the cluster
+    """
+    hBaseInstalled = False
+    if 'HBASE' in servicesList:
+      hBaseInstalled = True
+
+    cluster = {
+      "cpu": 0,
+      "disk": 0,
+      "ram": 0,
+      "hBaseInstalled": hBaseInstalled,
+      "components": components
+    }
+
+    if len(hosts["items"]) > 0:
+      nodeManagerHosts = self.getHostsWithComponent("YARN", "NODEMANAGER", 
services, hosts)
+      # NodeManager host with least memory is generally used in calculations 
as it will work in larger hosts.
+      if nodeManagerHosts is not None and len(nodeManagerHosts) > 0:
+        nodeManagerHost = nodeManagerHosts[0];
+        for nmHost in nodeManagerHosts:
+          if nmHost["Hosts"]["total_mem"] < 
nodeManagerHost["Hosts"]["total_mem"]:
+            nodeManagerHost = nmHost
+        host = nodeManagerHost["Hosts"]
+        cluster["referenceNodeManagerHost"] = host
+      else:
+        host = hosts["items"][0]["Hosts"]
+      cluster["referenceHost"] = host
+      cluster["cpu"] = host["cpu_count"]
+      cluster["disk"] = len(host["disk_info"])
+      cluster["ram"] = int(host["total_mem"] / (1024 * 1024))
+
+    ramRecommendations = [
+      {"os":1, "hbase":1},
+      {"os":2, "hbase":1},
+      {"os":2, "hbase":2},
+      {"os":4, "hbase":4},
+      {"os":6, "hbase":8},
+      {"os":8, "hbase":8},
+      {"os":8, "hbase":8},
+      {"os":12, "hbase":16},
+      {"os":24, "hbase":24},
+      {"os":32, "hbase":32},
+      {"os":64, "hbase":32}
+    ]
+    index = {
+      cluster["ram"] <= 4: 0,
+      4 < cluster["ram"] <= 8: 1,
+      8 < cluster["ram"] <= 16: 2,
+      16 < cluster["ram"] <= 24: 3,
+      24 < cluster["ram"] <= 48: 4,
+      48 < cluster["ram"] <= 64: 5,
+      64 < cluster["ram"] <= 72: 6,
+      72 < cluster["ram"] <= 96: 7,
+      96 < cluster["ram"] <= 128: 8,
+      128 < cluster["ram"] <= 256: 9,
+      256 < cluster["ram"]: 10
+    }[1]
+
+
+    cluster["reservedRam"] = ramRecommendations[index]["os"]
+    cluster["hbaseRam"] = ramRecommendations[index]["hbase"]
+
+
+    cluster["minContainerSize"] = {
+      cluster["ram"] <= 4: 256,
+      4 < cluster["ram"] <= 8: 512,
+      8 < cluster["ram"] <= 24: 1024,
+      24 < cluster["ram"]: 2048
+    }[1]
+
+    totalAvailableRam = cluster["ram"] - cluster["reservedRam"]
+    if cluster["hBaseInstalled"]:
+      totalAvailableRam -= cluster["hbaseRam"]
+    cluster["totalAvailableRam"] = max(512, totalAvailableRam * 1024)
+    '''containers = max(3, min (2*cores,min (1.8*DISKS,(Total available RAM) / 
MIN_CONTAINER_SIZE))))'''
+    cluster["containers"] = round(max(3,
+                                      min(2 * cluster["cpu"],
+                                          min(ceil(1.8 * cluster["disk"]),
+                                              cluster["totalAvailableRam"] / 
cluster["minContainerSize"]))))
+
+    '''ramPerContainers = max(2GB, RAM - reservedRam - hBaseRam) / 
containers'''
+    cluster["ramPerContainer"] = abs(cluster["totalAvailableRam"] / 
cluster["containers"])
+    '''If greater than 1GB, value will be in multiples of 512.'''
+    if cluster["ramPerContainer"] > 1024:
+      cluster["ramPerContainer"] = int(cluster["ramPerContainer"] / 512) * 512
+
+    cluster["mapMemory"] = int(cluster["ramPerContainer"])
+    cluster["reduceMemory"] = cluster["ramPerContainer"]
+    cluster["amMemory"] = max(cluster["mapMemory"], cluster["reduceMemory"])
+
+    return cluster
 
   def validateClusterConfigurations(self, configurations, services, hosts):
     validationItems = []
@@ -886,6 +982,12 @@ class DefaultStackAdvisor(StackAdvisor):
     return self.toConfigurationValidationProblems(validationItems, "")
 
   def toConfigurationValidationProblems(self, validationProblems, siteName):
+    """
+    Encapsulate the validation item's fields of "level" and "message" for the 
given validation's config-name.
+    :param validationProblems: List of validation problems
+    :param siteName: Config type
+    :return: List of configuration validation problems that include additional 
fields like the log level.
+    """
     result = []
     for validationProblem in validationProblems:
       validationItem = validationProblem.get("item", None)
@@ -947,7 +1049,31 @@ class DefaultStackAdvisor(StackAdvisor):
     self.validateMinMax(items, recommendedDefaults, configurations)
     return items
 
+  def validateListOfConfigUsingMethod(self, configurations, 
recommendedDefaults, services, hosts, validators):
+    """
+    Service Advisors can use this method to pass in a list of validators, each 
of which is a tuple of a
+    a config type (string) and a function (pointer). Each validator is then 
executed.
+    :param validators: List of tuples like [("hadoop-env", 
someFunctionPointer), ("hdfs-site", someFunctionPointer)]
+    :return: List of validation errors
+    """
+    items = []
+    for (configType, method) in validators:
+      if configType in recommendedDefaults:
+        siteProperties = self.getSiteProperties(configurations, configType)
+        if siteProperties is not None:
+          siteRecommendations = recommendedDefaults[configType]["properties"]
+          print("SiteName: %s, method: %s" % (configType, method.__name__))
+          print("Site properties: %s" % str(siteProperties))
+          print("Recommendations: %s" % str(siteRecommendations))
+          validationItems = method(siteProperties, siteRecommendations, 
configurations, services, hosts)
+          items.extend(validationItems)
+    return items
+
   def validateConfigurationsForSite(self, configurations, recommendedDefaults, 
services, hosts, siteName, method):
+    """
+    Deprecated, please use validateListOfConfigUsingMethod
+    :return: List of validation errors by calling the corresponding method.
+    """
     if siteName in recommendedDefaults:
       siteProperties = self.getSiteProperties(configurations, siteName)
       if siteProperties is not None:
@@ -1189,7 +1315,7 @@ class DefaultStackAdvisor(StackAdvisor):
     The scheme dictionary basically maps the number of hosts to
     host index where component should exist.
     """
-    return {}
+    return self.componentLayoutSchemes
 
   def getWarnItem(self, message):
     """
@@ -1672,3 +1798,524 @@ class DefaultStackAdvisor(StackAdvisor):
 
       if recommendation:
         put_f(name, ",".join(recommendation))
+
+  def getHostNamesWithComponent(self, serviceName, componentName, services):
+    """
+    Returns the list of hostnames on which service component is installed
+    """
+    if services is not None and serviceName in 
[service["StackServices"]["service_name"] for service in services["services"]]:
+      service = [serviceEntry for serviceEntry in services["services"] if 
serviceEntry["StackServices"]["service_name"] == serviceName][0]
+      components = [componentEntry for componentEntry in service["components"] 
if componentEntry["StackServiceComponents"]["component_name"] == componentName]
+      if (len(components) > 0 and 
len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
+        componentHostnames = 
components[0]["StackServiceComponents"]["hostnames"]
+        return componentHostnames
+    return []
+
+  def getHostsWithComponent(self, serviceName, componentName, services, hosts):
+    if services is not None and hosts is not None and serviceName in 
[service["StackServices"]["service_name"] for service in services["services"]]:
+      service = [serviceEntry for serviceEntry in services["services"] if 
serviceEntry["StackServices"]["service_name"] == serviceName][0]
+      components = [componentEntry for componentEntry in service["components"] 
if componentEntry["StackServiceComponents"]["component_name"] == componentName]
+      if (len(components) > 0 and 
len(components[0]["StackServiceComponents"]["hostnames"]) > 0):
+        componentHostnames = 
components[0]["StackServiceComponents"]["hostnames"]
+        componentHosts = [host for host in hosts["items"] if 
host["Hosts"]["host_name"] in componentHostnames]
+        return componentHosts
+    return []
+
+  def getHostWithComponent(self, serviceName, componentName, services, hosts):
+    componentHosts = self.getHostsWithComponent(serviceName, componentName, 
services, hosts)
+    if (len(componentHosts) > 0):
+      return componentHosts[0]
+    return None
+
+  def getHostComponentsByCategories(self, hostname, categories, services, 
hosts):
+    components = []
+    if services is not None and hosts is not None:
+      for service in services["services"]:
+        components.extend([componentEntry for componentEntry in 
service["components"]
+                           if 
componentEntry["StackServiceComponents"]["component_category"] in categories
+                           and hostname in 
componentEntry["StackServiceComponents"]["hostnames"]])
+    return components
+
+  def get_services_list(self, services):
+    """
+    Returns available services as list
+
+    :type services dict
+    :rtype list
+    """
+    if not services:
+      return []
+
+    return [service["StackServices"]["service_name"] for service in 
services["services"]]
+
+  def getHadoopProxyUsersValidationItems(self, properties, services, hosts, 
configurations):
+    validationItems = []
+    users = self.getHadoopProxyUsers(services, hosts, configurations)
+    for user_name, user_properties in users.iteritems():
+      props = ["hadoop.proxyuser.{0}.hosts".format(user_name)]
+      if "propertyGroups" in user_properties:
+        props.append("hadoop.proxyuser.{0}.groups".format(user_name))
+
+      for prop in props:
+        validationItems.append({"config-name": prop, "item": 
self.validatorNotEmpty(properties, prop)})
+
+    return validationItems
+
+  def getHadoopProxyUsers(self, services, hosts, configurations):
+    """
+    Gets Hadoop Proxy User recommendations based on the configuration that is 
provided by
+    getServiceHadoopProxyUsersConfigurationDict.
+
+    See getServiceHadoopProxyUsersConfigurationDict
+    """
+    servicesList = self.get_services_list(services)
+    users = {}
+
+    for serviceName, serviceUserComponents in 
self.getServiceHadoopProxyUsersConfigurationDict().iteritems():
+      users.update(self._getHadoopProxyUsersForService(serviceName, 
serviceUserComponents, services, hosts, configurations))
+
+    return users
+
+  def getServiceHadoopProxyUsersConfigurationDict(self):
+    """
+    Returns a map that is used by 'getHadoopProxyUsers' to determine service
+    user properties and related components and get proxyuser recommendations.
+    This method can be overridden in stackadvisors for the further stacks to
+    add additional services or change the previous logic.
+
+    Example of the map format:
+    {
+      "serviceName": [
+        ("configTypeName1", "userPropertyName1", {"propertyHosts": "*", 
"propertyGroups": "exact string value"})
+        ("configTypeName2", "userPropertyName2", {"propertyHosts": 
["COMPONENT1", "COMPONENT2", "COMPONENT3"], "propertyGroups": "*"}),
+        ("configTypeName3", "userPropertyName3", {"propertyHosts": 
["COMPONENT1", "COMPONENT2", "COMPONENT3"]}, filterFunction)
+      ],
+      "serviceName2": [
+        ...
+    }
+
+    If the third element of a tuple is map that maps proxy property to it's 
value.
+    The key could be either 'propertyHosts' or 'propertyGroups'. (Both are 
optional)
+    If the map value is a string, then this string will be used for the 
proxyuser
+    value (e.g. 'hadoop.proxyuser.{user}.hosts' = '*').
+    Otherwise map value should be alist or a tuple with component names.
+    All hosts with the provided components will be added
+    to the property (e.g. 'hadoop.proxyuser.{user}.hosts' = 
'host1,host2,host3')
+
+    The forth element of the tuple is optional and if it's provided,
+    it should be a function that takes two arguments: services and hosts.
+    If it returns False, proxyusers for the tuple will not be added.
+    """
+    ALL_WILDCARD = "*"
+    HOSTS_PROPERTY = "propertyHosts"
+    GROUPS_PROPERTY = "propertyGroups"
+
+    return {
+      "HDFS":   [("hadoop-env", "hdfs_user", {HOSTS_PROPERTY: ALL_WILDCARD, 
GROUPS_PROPERTY: ALL_WILDCARD})],
+      "OOZIE":  [("oozie-env", "oozie_user", {HOSTS_PROPERTY: 
["OOZIE_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
+      "HIVE":   [("hive-env", "hive_user", {HOSTS_PROPERTY: ["HIVE_SERVER", 
"HIVE_SERVER_INTERACTIVE"], GROUPS_PROPERTY: ALL_WILDCARD}),
+                 ("hive-env", "webhcat_user", {HOSTS_PROPERTY: 
["WEBHCAT_SERVER"], GROUPS_PROPERTY: ALL_WILDCARD})],
+      "YARN":   [("yarn-env", "yarn_user", {HOSTS_PROPERTY: 
["RESOURCEMANAGER"]}, lambda services, hosts: 
len(self.getHostsWithComponent("YARN", "RESOURCEMANAGER", services, hosts)) > 
1)],
+      "FALCON": [("falcon-env", "falcon_user", {HOSTS_PROPERTY: ALL_WILDCARD, 
GROUPS_PROPERTY: ALL_WILDCARD})],
+      "SPARK":  [("livy-env", "livy_user", {HOSTS_PROPERTY: ALL_WILDCARD, 
GROUPS_PROPERTY: ALL_WILDCARD})]
+    }
+
+  def _getHadoopProxyUsersForService(self, serviceName, serviceUserComponents, 
services, hosts, configurations):
+    Logger.info("Calculating Hadoop Proxy User recommendations for {0} 
service.".format(serviceName))
+    servicesList = self.get_services_list(services)
+    resultUsers = {}
+
+    if serviceName in servicesList:
+      usersComponents = {}
+      for values in serviceUserComponents:
+
+        # Filter, if 4th argument is present in the tuple
+        filterFunction = values[3:]
+        if filterFunction and not filterFunction[0](services, hosts):
+          continue
+
+        userNameConfig, userNameProperty, hostSelectorMap = values[:3]
+        user = get_from_dict(services, ("configurations", userNameConfig, 
"properties", userNameProperty), None)
+        if user:
+          usersComponents[user] = (userNameConfig, userNameProperty, 
hostSelectorMap)
+
+      for user, (userNameConfig, userNameProperty, hostSelectorMap) in 
usersComponents.iteritems():
+        proxyUsers = {"config": userNameConfig, "propertyName": 
userNameProperty}
+        for proxyPropertyName, hostSelector in hostSelectorMap.iteritems():
+          componentHostNamesString = hostSelector if isinstance(hostSelector, 
basestring) else '*'
+          if isinstance(hostSelector, (list, tuple)):
+            _, componentHostNames = self.get_data_for_proxyuser(user, 
services, configurations) # preserve old values
+            for component in hostSelector:
+              componentHosts = self.getHostsWithComponent(serviceName, 
component, services, hosts)
+              if componentHosts is not None:
+                for componentHost in componentHosts:
+                  componentHostName =  componentHost["Hosts"]["host_name"]
+                  componentHostNames.add(componentHostName)
+
+            componentHostNamesString = ",".join(sorted(componentHostNames))
+            Logger.info("Host List for [service='{0}'; user='{1}'; 
components='{2}']: {3}".format(serviceName, user, ','.join(hostSelector), 
componentHostNamesString))
+
+          if not proxyPropertyName in proxyUsers:
+            proxyUsers[proxyPropertyName] = componentHostNamesString
+
+        if not user in resultUsers:
+          resultUsers[user] = proxyUsers
+
+    return resultUsers
+
+  def recommendHadoopProxyUsers(self, configurations, services, hosts):
+    servicesList = self.get_services_list(services)
+
+    if 'forced-configurations' not in services:
+      services["forced-configurations"] = []
+
+    putCoreSiteProperty = self.putProperty(configurations, "core-site", 
services)
+    putCoreSitePropertyAttribute = self.putPropertyAttribute(configurations, 
"core-site")
+
+    users = self.getHadoopProxyUsers(services, hosts, configurations)
+
+    # Force dependencies for HIVE
+    if "HIVE" in servicesList:
+      hive_user = get_from_dict(services, ("configurations", "hive-env", 
"properties", "hive_user"), default_value=None)
+      if hive_user and get_from_dict(users, (hive_user, "propertyHosts"), 
default_value=None):
+        services["forced-configurations"].append({"type" : "core-site", "name" 
: "hadoop.proxyuser.{0}.hosts".format(hive_user)})
+
+    for user_name, user_properties in users.iteritems():
+
+      # Add properties "hadoop.proxyuser.*.hosts", "hadoop.proxyuser.*.groups" 
to core-site for all users
+      self.put_proxyuser_value(user_name, user_properties["propertyHosts"], 
services=services, configurations=configurations, 
put_function=putCoreSiteProperty)
+      Logger.info("Updated hadoop.proxyuser.{0}.hosts as : 
{1}".format(user_name, user_properties["propertyHosts"]))
+      if "propertyGroups" in user_properties:
+        self.put_proxyuser_value(user_name, user_properties["propertyGroups"], 
is_groups=True, services=services, configurations=configurations, 
put_function=putCoreSiteProperty)
+
+      # Remove old properties if user was renamed
+      userOldValue = self.getOldValue(services, user_properties["config"], 
user_properties["propertyName"])
+      if userOldValue is not None and userOldValue != user_name:
+        
putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(userOldValue), 
'delete', 'true')
+        services["forced-configurations"].append({"type" : "core-site", "name" 
: "hadoop.proxyuser.{0}.hosts".format(userOldValue)})
+        services["forced-configurations"].append({"type" : "core-site", "name" 
: "hadoop.proxyuser.{0}.hosts".format(user_name)})
+
+        if "propertyGroups" in user_properties:
+          
putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(userOldValue),
 'delete', 'true')
+          services["forced-configurations"].append({"type" : "core-site", 
"name" : "hadoop.proxyuser.{0}.groups".format(userOldValue)})
+          services["forced-configurations"].append({"type" : "core-site", 
"name" : "hadoop.proxyuser.{0}.groups".format(user_name)})
+
+    self.recommendAmbariProxyUsersForHDFS(services, configurations, 
servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute)
+
+  def recommendAmbariProxyUsersForHDFS(self, services, configurations, 
servicesList, putCoreSiteProperty, putCoreSitePropertyAttribute):
+    if "HDFS" in servicesList:
+      ambari_user = self.getAmbariUser(services)
+      ambariHostName = socket.getfqdn()
+      self.put_proxyuser_value(ambari_user, ambariHostName, services=services, 
configurations=configurations, put_function=putCoreSiteProperty)
+      self.put_proxyuser_value(ambari_user, "*", is_groups=True, 
services=services, configurations=configurations, 
put_function=putCoreSiteProperty)
+      old_ambari_user = self.getOldAmbariUser(services)
+      if old_ambari_user is not None:
+        
putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.hosts".format(old_ambari_user),
 'delete', 'true')
+        
putCoreSitePropertyAttribute("hadoop.proxyuser.{0}.groups".format(old_ambari_user),
 'delete', 'true')
+
+  def getAmbariUser(self, services):
+    ambari_user = services['ambari-server-properties']['ambari-server.user']
+    if "cluster-env" in services["configurations"] \
+        and "ambari_principal_name" in 
services["configurations"]["cluster-env"]["properties"] \
+        and "security_enabled" in 
services["configurations"]["cluster-env"]["properties"] \
+        and 
services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower()
 == "true":
+      ambari_user = 
services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
+      ambari_user = ambari_user.split('@')[0]
+    return ambari_user
+
+  def getOldAmbariUser(self, services):
+    ambari_user = None
+    if "cluster-env" in services["configurations"]:
+      if "security_enabled" in 
services["configurations"]["cluster-env"]["properties"] \
+          and 
services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower()
 == "true":
+        ambari_user = 
services['ambari-server-properties']['ambari-server.user']
+      elif "ambari_principal_name" in 
services["configurations"]["cluster-env"]["properties"]:
+        ambari_user = 
services["configurations"]["cluster-env"]["properties"]["ambari_principal_name"]
+        ambari_user = ambari_user.split('@')[0]
+    return ambari_user
+
+  def put_proxyuser_value(self, user_name, value, is_groups=False, 
services=None, configurations=None, put_function=None):
+    is_wildcard_value, current_value = self.get_data_for_proxyuser(user_name, 
services, configurations, is_groups)
+    result_value = "*"
+    result_values_set = self.merge_proxyusers_values(current_value, value)
+    if len(result_values_set) > 0:
+      result_value = ",".join(sorted([val for val in result_values_set if 
val]))
+
+    if is_groups:
+      property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
+    else:
+      property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
+
+    put_function(property_name, result_value)
+
+  def get_data_for_proxyuser(self, user_name, services, configurations, 
groups=False):
+    """
+    Returns values of proxyuser properties for given user. Properties can be
+    hadoop.proxyuser.username.groups or hadoop.proxyuser.username.hosts
+    :param user_name:
+    :param services:
+    :param groups: if true, will return values for group property, not hosts
+    :return: tuple (wildcard_value, set[values]), where wildcard_value 
indicates if property value was *
+    """
+    if "core-site" in services["configurations"]:
+      coreSite = services["configurations"]["core-site"]['properties']
+    else:
+      coreSite = {}
+    if groups:
+      property_name = "hadoop.proxyuser.{0}.groups".format(user_name)
+    else:
+      property_name = "hadoop.proxyuser.{0}.hosts".format(user_name)
+    if property_name in coreSite:
+      property_value = coreSite[property_name]
+      if property_value == "*":
+        return True, set()
+      else:
+        result_values = set()
+        if "core-site" in configurations:
+          if property_name in configurations["core-site"]['properties']:
+            result_values = 
result_values.union(configurations["core-site"]['properties'][property_name].split(","))
+        result_values = result_values.union(property_value.split(","))
+        return False, result_values
+    return False, set()
+
+  def merge_proxyusers_values(self, first, second):
+    result = set()
+    def append(data):
+      if isinstance(data, str) or isinstance(data, unicode):
+        if data != "*":
+          result.update(data.split(","))
+      else:
+        result.update(data)
+    append(first)
+    append(second)
+    return result
+
+  def getOldValue(self, services, configType, propertyName):
+    if services:
+      if 'changed-configurations' in services.keys():
+        changedConfigs = services["changed-configurations"]
+        for changedConfig in changedConfigs:
+          if changedConfig["type"] == configType and changedConfig["name"]== 
propertyName and "old_value" in changedConfig:
+            return changedConfig["old_value"]
+    return None
+
+  @classmethod
+  def isSecurePort(cls, port):
+    """
+    Returns True if port is root-owned at *nix systems
+    """
+    if port is not None:
+      return port < 1024
+    else:
+      return False
+
+  @classmethod
+  def getPort(cls, address):
+    """
+    Extracts port from the address like 0.0.0.0:1019
+    """
+    if address is None:
+      return None
+    m = re.search(r'(?:http(?:s)?://)?([\w\d.]*):(\d{1,5})', address)
+    if m is not None:
+      return int(m.group(2))
+    else:
+      return None
+
+  def validatorLessThenDefaultValue(self, properties, recommendedDefaults, 
propertyName):
+    if propertyName not in recommendedDefaults:
+      # If a property name exists in say hbase-env and hbase-site (which is 
allowed), then it will exist in the
+      # "properties" dictionary, but not necessarily in the 
"recommendedDefaults" dictionary". In this case, ignore it.
+      return None
+
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set")
+    value = self.to_number(properties[propertyName])
+    if value is None:
+      return self.getErrorItem("Value should be integer")
+    defaultValue = self.to_number(recommendedDefaults[propertyName])
+    if defaultValue is None:
+      return None
+    if value < defaultValue:
+      return self.getWarnItem("Value is less than the recommended default of 
{0}".format(defaultValue))
+    return None
+
+  def validatorEqualsPropertyItem(self, properties1, propertyName1,
+                                  properties2, propertyName2,
+                                  emptyAllowed=False):
+    if not propertyName1 in properties1:
+      return self.getErrorItem("Value should be set for %s" % propertyName1)
+    if not propertyName2 in properties2:
+      return self.getErrorItem("Value should be set for %s" % propertyName2)
+    value1 = properties1.get(propertyName1)
+    if value1 is None and not emptyAllowed:
+      return self.getErrorItem("Empty value for %s" % propertyName1)
+    value2 = properties2.get(propertyName2)
+    if value2 is None and not emptyAllowed:
+      return self.getErrorItem("Empty value for %s" % propertyName2)
+    if value1 != value2:
+      return self.getWarnItem("It is recommended to set equal values "
+                              "for properties {0} and 
{1}".format(propertyName1, propertyName2))
+
+    return None
+
+  def validatorEqualsToRecommendedItem(self, properties, recommendedDefaults,
+                                       propertyName):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set for %s" % propertyName)
+    value = properties.get(propertyName)
+    if not propertyName in recommendedDefaults:
+      return self.getErrorItem("Value should be recommended for %s" % 
propertyName)
+    recommendedValue = recommendedDefaults.get(propertyName)
+    if value != recommendedValue:
+      return self.getWarnItem("It is recommended to set value {0} "
+                              "for property {1}".format(recommendedValue, 
propertyName))
+    return None
+
+  def validatorNotEmpty(self, properties, propertyName):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set for 
{0}".format(propertyName))
+    value = properties.get(propertyName)
+    if not value:
+      return self.getWarnItem("Empty value for {0}".format(propertyName))
+    return None
+
+  def validatorNotRootFs(self, properties, recommendedDefaults, propertyName, 
hostInfo):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set")
+    dir = properties[propertyName]
+    if not dir.startswith("file://") or dir == 
recommendedDefaults.get(propertyName):
+      return None
+
+    dir = re.sub("^file://", "", dir, count=1)
+    mountPoints = []
+    for mountPoint in hostInfo["disk_info"]:
+      mountPoints.append(mountPoint["mountpoint"])
+    mountPoint = DefaultStackAdvisor.getMountPointForDir(dir, mountPoints)
+
+    if "/" == mountPoint and self.getPreferredMountPoints(hostInfo)[0] != 
mountPoint:
+      return self.getWarnItem("It is not recommended to use root partition for 
{0}".format(propertyName))
+
+    return None
+
+  def validatorEnoughDiskSpace(self, properties, propertyName, hostInfo, 
reqiuredDiskSpace):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set")
+    dir = properties[propertyName]
+    if not dir.startswith("file://"):
+      return None
+
+    dir = re.sub("^file://", "", dir, count=1)
+    mountPoints = {}
+    for mountPoint in hostInfo["disk_info"]:
+      mountPoints[mountPoint["mountpoint"]] = 
self.to_number(mountPoint["available"])
+    mountPoint = DefaultStackAdvisor.getMountPointForDir(dir, 
mountPoints.keys())
+
+    if not mountPoints:
+      return self.getErrorItem("No disk info found on host %s" % 
hostInfo["host_name"])
+
+    if mountPoints[mountPoint] < reqiuredDiskSpace:
+      msg = "Ambari Metrics disk space requirements not met. \n" \
+            "Recommended disk space for partition {0} is {1}G"
+      return self.getWarnItem(msg.format(mountPoint, 
reqiuredDiskSpace/1048576)) # in Gb
+    return None
+
+  @classmethod
+  def is_valid_host_port_authority(cls, target):
+    has_scheme = "://" in target
+    if not has_scheme:
+      target = "dummyscheme://"+target
+    try:
+      result = urlparse(target)
+      if result.hostname is not None and result.port is not None:
+        return True
+    except ValueError:
+      pass
+    return False
+
+  @classmethod
+  def getMountPointForDir(cls, dir, mountPoints):
+    """
+    :param dir: Directory to check, even if it doesn't exist.
+    :return: Returns the closest mount point as a string for the directory.
+    if the "dir" variable is None, will return None.
+    If the directory does not exist, will return "/".
+    """
+    bestMountFound = None
+    if dir:
+      dir = re.sub("^file://", "", dir, count=1).strip().lower()
+
+      # If the path is "/hadoop/hdfs/data", then possible matches for mounts 
could be
+      # "/", "/hadoop/hdfs", and "/hadoop/hdfs/data".
+      # So take the one with the greatest number of segments.
+      for mountPoint in mountPoints:
+        # Ensure that the mount path and the dir path ends with "/"
+        # The mount point "/hadoop" should not match with the path "/hadoop1"
+        if os.path.join(dir, "").startswith(os.path.join(mountPoint, "")):
+          if bestMountFound is None:
+            bestMountFound = mountPoint
+          elif os.path.join(bestMountFound, "").count(os.path.sep) < 
os.path.join(mountPoint, "").count(os.path.sep):
+            bestMountFound = mountPoint
+
+    return bestMountFound
+
+  def validateMinMemorySetting(self, properties, defaultValue, propertyName):
+    if not propertyName in properties:
+      return self.getErrorItem("Value should be set")
+    if defaultValue is None:
+      return self.getErrorItem("Config's default value can't be null or 
undefined")
+
+    value = properties[propertyName]
+    if value is None:
+      return self.getErrorItem("Value can't be null or undefined")
+    try:
+      valueInt = self.to_number(value)
+      # TODO: generify for other use cases
+      defaultValueInt = int(str(defaultValue).strip())
+      if valueInt < defaultValueInt:
+        return self.getWarnItem("Value is less than the minimum recommended 
default of -Xmx" + str(defaultValue))
+    except:
+      return None
+
+    return None
+
+  @classmethod
+  def checkXmxValueFormat(cls, value):
+    p = re.compile('-Xmx(\d+)(b|k|m|g|p|t|B|K|M|G|P|T)?')
+    matches = p.findall(value)
+    return len(matches) == 1
+
+  @classmethod
+  def getXmxSize(cls, value):
+    p = re.compile("-Xmx(\d+)(.?)")
+    result = p.findall(value)[0]
+    if len(result) > 1:
+      # result[1] - is a space or size formatter (b|k|m|g etc)
+      return result[0] + result[1].lower()
+    return result[0]
+
+  @classmethod
+  def formatXmxSizeToBytes(cls, value):
+    value = value.lower()
+    if len(value) == 0:
+      return 0
+    modifier = value[-1]
+
+    if modifier == ' ' or modifier in "0123456789":
+      modifier = 'b'
+    m = {
+      modifier == 'b': 1,
+      modifier == 'k': 1024,
+      modifier == 'm': 1024 * 1024,
+      modifier == 'g': 1024 * 1024 * 1024,
+      modifier == 't': 1024 * 1024 * 1024 * 1024,
+      modifier == 'p': 1024 * 1024 * 1024 * 1024 * 1024
+    }[1]
+    return cls.to_number(value) * m
+
+  @classmethod
+  def to_number(cls, s):
+    try:
+      return int(re.sub("\D", "", s))
+    except ValueError:
+      return None

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/TestStackAdvisor.py
----------------------------------------------------------------------
diff --git a/ambari-server/src/test/python/TestStackAdvisor.py 
b/ambari-server/src/test/python/TestStackAdvisor.py
index 87d2d15..d48def3 100644
--- a/ambari-server/src/test/python/TestStackAdvisor.py
+++ b/ambari-server/src/test/python/TestStackAdvisor.py
@@ -270,8 +270,24 @@ class TestStackAdvisorInitialization(TestCase):
     }
     hosts= {
       "items": [
-        {"Hosts": {"host_name": "host1"}},
-        {"Hosts": {"host_name": "host2"}}
+        {"Hosts": {"host_name": "host1",
+                   "cpu_count": 1,
+                   "total_mem": 2097152,
+                   "disk_info": [{
+                     "size": '80000000',
+                     "mountpoint": "/"
+                   }]
+                   }
+         },
+        {"Hosts": {"host_name": "host2",
+                   "cpu_count": 1,
+                   "total_mem": 2097152,
+                   "disk_info": [{
+                     "size": '80000000',
+                     "mountpoint": "/"
+                   }]
+                   }
+         }
       ]
     }
     actualValidateConfigResponse = 
default_stack_advisor.validateConfigurations(services, hosts)
@@ -343,8 +359,14 @@ class TestStackAdvisorInitialization(TestCase):
     # Test with maintenance_state. One host is in maintenance mode.
     hosts= {
       "items": [
-        {"Hosts": {"host_name": "host1", "maintenance_state":"OFF"}},
-        {"Hosts": {"host_name": "host2", "maintenance_state":"ON"}}
+        {"Hosts": {"host_name": "host1",
+                   "maintenance_state":"OFF",
+                   "cpu_count": 1}
+         },
+        {"Hosts": {"host_name": "host2",
+                   "maintenance_state":"ON",
+                   "cpu_count": 1}
+         }
       ]
     }
 
@@ -397,8 +419,26 @@ class TestStackAdvisorInitialization(TestCase):
     # Test with maintenance_state. Both hosts are in maintenance mode.
     hosts= {
       "items": [
-        {"Hosts": {"host_name": "host1", "maintenance_state":"ON"}},
-        {"Hosts": {"host_name": "host2", "maintenance_state":"ON"}}
+        {"Hosts": {"host_name": "host1",
+                   "maintenance_state":"ON",
+                   "cpu_count": 1,
+                   "total_mem": 2097152,
+                   "disk_info": [{
+                     "size": '80000000',
+                     "mountpoint": "/"
+                   }]
+                   }
+         },
+        {"Hosts": {"host_name": "host2",
+                   "maintenance_state":"ON",
+                   "cpu_count": 1,
+                   "total_mem": 2097152,
+                   "disk_info": [{
+                     "size": '80000000',
+                     "mountpoint": "/"
+                   }]
+                   }
+         }
       ]
     }
 

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py 
b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
index 1145154..a486fb3 100644
--- a/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.0.6/common/test_stack_advisor.py
@@ -3253,11 +3253,11 @@ class TestHDP206StackAdvisor(TestCase):
     self.assertEquals(self.stack_advisor_impl.round_to_n(4097), 4096)
 
   def test_getMountPointForDir(self):
-    self.assertEquals(self.stack_advisor_impl.getMountPointForDir("/var/log", 
["/"]), "/")
-    self.assertEquals(self.stack_advisor_impl.getMountPointForDir("/var/log", 
["/var", "/"]), "/var")
-    
self.assertEquals(self.stack_advisor_impl.getMountPointForDir("file:///var/log",
 ["/var", "/"]), "/var")
-    
self.assertEquals(self.stack_advisor_impl.getMountPointForDir("hdfs:///hdfs_path",
 ["/var", "/"]), None)
-    
self.assertEquals(self.stack_advisor_impl.getMountPointForDir("relative/path", 
["/var", "/"]), None)
+    self.assertEquals(self.stackAdvisor.getMountPointForDir("/var/log", 
["/"]), "/")
+    self.assertEquals(self.stackAdvisor.getMountPointForDir("/var/log", 
["/var", "/"]), "/var")
+    self.assertEquals(self.stackAdvisor.getMountPointForDir("file:///var/log", 
["/var", "/"]), "/var")
+    
self.assertEquals(self.stackAdvisor.getMountPointForDir("hdfs:///hdfs_path", 
["/var", "/"]), None)
+    self.assertEquals(self.stackAdvisor.getMountPointForDir("relative/path", 
["/var", "/"]), None)
 
   def test_parseCardinality(self):
     self.assertEquals(self.stackAdvisor.parseCardinality("ALL", 5), (5, 5))

http://git-wip-us.apache.org/repos/asf/ambari/blob/326cc1b2/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
----------------------------------------------------------------------
diff --git 
a/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py 
b/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
index f9fb1f5..e8bd5d0 100644
--- a/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
+++ b/ambari-server/src/test/python/stacks/2.1/common/test_stack_advisor.py
@@ -168,11 +168,8 @@ class TestHDP21StackAdvisor(TestCase):
       if len(components) > 0:
         groups.append(components)
 
-    def sort_nested_lists(list):
-      result_list = []
-      for sublist in list:
-        result_list.append(sorted(sublist))
-      return sorted(result_list)
+    def sort_nested_lists(l):
+      return sorted(reduce(lambda x,y: x+y, l))
 
     self.assertEquals(sort_nested_lists(expected_layout), 
sort_nested_lists(groups))
 

Reply via email to