This is an automated email from the ASF dual-hosted git repository.

swagle pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit 6d2c6bab0ba57c362bc06871f866b1d6a914e5eb
Author: majorendre <majorendre>
AuthorDate: Thu Feb 8 11:10:29 2018 +0100

    Federation support added to name_node_ha_utils.py
    
    So far the code could  handle only a single name service. Added support for 
multiple name services.
---
 .../resource_management/TestNamenodeHaUtils.py     |  15 ++-
 .../libraries/functions/namenode_ha_utils.py       | 109 +++++++++++++--------
 2 files changed, 81 insertions(+), 43 deletions(-)

diff --git 
a/ambari-agent/src/test/python/resource_management/TestNamenodeHaUtils.py 
b/ambari-agent/src/test/python/resource_management/TestNamenodeHaUtils.py
index 2fc4904..6f2e5aa 100644
--- a/ambari-agent/src/test/python/resource_management/TestNamenodeHaUtils.py
+++ b/ambari-agent/src/test/python/resource_management/TestNamenodeHaUtils.py
@@ -19,7 +19,7 @@ limitations under the License.
 '''
 from unittest import TestCase
 from resource_management.libraries.functions.namenode_ha_utils import \
-  get_nameservice
+  get_nameservices
 
 
 class TestNamenodeHaUtils(TestCase):
@@ -39,7 +39,7 @@ class TestNamenodeHaUtils(TestCase):
       "dfs.namenode.rpc-address.HAB.nn2": "hostb2:8020",
     }
 
-    self.assertEqual("HAA", get_nameservice(hdfs_site))
+    self.assertEqual(["HAA"], get_nameservices(hdfs_site))
 
     # dfs.internal.nameservices not in hdfs-site
     hdfs_site = {
@@ -52,9 +52,16 @@ class TestNamenodeHaUtils(TestCase):
       "dfs.namenode.rpc-address.HAB.nn2": "hostb2:8020",
     }
 
-    self.assertEqual("HAA", get_nameservice(hdfs_site))
+    self.assertEqual(["HAA"], get_nameservices(hdfs_site))
 
     # Non HA
     hdfs_site = {}
 
-    self.assertEqual(None, get_nameservice(hdfs_site))
+    self.assertEqual([], get_nameservices(hdfs_site))
+
+    # federated config dfs.internal.nameservices in hdfs-site
+    hdfs_site = {
+      "dfs.internal.nameservices": "ns1,ns2",
+    }
+
+    self.assertEqual(["ns1","ns2"], get_nameservices(hdfs_site))
diff --git 
a/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py
 
b/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py
index 7a2635f..d3a9c2d 100644
--- 
a/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py
+++ 
b/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py
@@ -28,7 +28,7 @@ from resource_management.libraries.functions.hdfs_utils 
import is_https_enabled_
 
 
 __all__ = ["get_namenode_states", "get_active_namenode",
-           "get_property_for_active_namenode", "get_nameservice"]
+           "get_property_for_active_namenode", "get_nameservices"]
 
 HDFS_NN_STATE_ACTIVE = 'active'
 HDFS_NN_STATE_STANDBY = 'standby'
@@ -65,15 +65,32 @@ def get_namenode_states(hdfs_site, security_enabled, 
run_user, times=10, sleep_t
   doRetries.attempt = 0
   return doRetries(hdfs_site, security_enabled, run_user)
 
+
 def get_namenode_states_noretries(hdfs_site, security_enabled, run_user, 
last_retry=True):
   """
+  returns data for all name nodes of all name services
+  """
+  active_namenodes = []
+  standby_namenodes = []
+  unknown_namenodes = []
+
+  name_services = get_nameservices(hdfs_site)
+  for name_service in name_services:
+    active, standby, unknown = 
_get_namenode_states_noretries_single_ns(hdfs_site, name_service, 
security_enabled, run_user, last_retry)
+    active_namenodes += active
+    standby_namenodes += standby
+    unknown_namenodes += unknown
+  return active_namenodes, standby_namenodes, unknown_namenodes
+
+
+def _get_namenode_states_noretries_single_ns(hdfs_site, name_service, 
security_enabled, run_user, last_retry=True):
+  """
   return format [('nn1', 'hdfs://hostname1:port1'), ('nn2', 
'hdfs://hostname2:port2')] , [....], [....]
   """
   active_namenodes = []
   standby_namenodes = []
   unknown_namenodes = []
-  
-  name_service = get_nameservice(hdfs_site)
+
   nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service
 
   # now we have something like 'nn1,nn2,nn3,nn4'
@@ -90,7 +107,7 @@ def get_namenode_states_noretries(hdfs_site, 
security_enabled, run_user, last_re
     else:
       key = NAMENODE_HTTPS_FRAGMENT.format(name_service,nn_unique_id)
       protocol = "https"
-    
+
     if key in hdfs_site:
       # use str() to ensure that unicode strings do not have the u' in them
       value = str(hdfs_site[key])
@@ -101,11 +118,11 @@ def get_namenode_states_noretries(hdfs_site, 
security_enabled, run_user, last_re
           value = value.replace(INADDR_ANY, rpc_host)
 
       jmx_uri = JMX_URI_FRAGMENT.format(protocol, value)
-      
+
       state = get_value_from_jmx(jmx_uri, 'tag.HAState', security_enabled, 
run_user, is_https_enabled, last_retry)
       # If JMX parsing failed
       if not state:
-        check_service_cmd = "hdfs haadmin -ns {0} -getServiceState 
{1}".format(get_nameservice(hdfs_site), nn_unique_id)
+        check_service_cmd = "hdfs haadmin -ns {0} -getServiceState 
{1}".format(name_service, nn_unique_id)
         code, out = shell.call(check_service_cmd, logoutput=True, 
user=run_user)
         if code == 0 and out:
           if HDFS_NN_STATE_STANDBY in out:
@@ -122,22 +139,19 @@ def get_namenode_states_noretries(hdfs_site, 
security_enabled, run_user, last_re
 
   return active_namenodes, standby_namenodes, unknown_namenodes
 
-def is_ha_enabled(hdfs_site):
-  dfs_ha_nameservices = get_nameservice(hdfs_site)
-  
-  if not dfs_ha_nameservices or is_empty(dfs_ha_nameservices):
-    return False
-  
-  dfs_ha_namenode_ids = 
hdfs_site[format("dfs.ha.namenodes.{dfs_ha_nameservices}")]
-  
-  if not is_empty(dfs_ha_namenode_ids):
-    dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",")
-    dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list)
-    if dfs_ha_namenode_ids_array_len > 1:
-      return True
-      
+
+def _is_ha_config(hdfs_site):
+  """
+  returns True if an HA config is used
+  """
+  name_services = hdfs_site.get('dfs.nameservices', None)
+  if name_services:
+    for ns in name_services.split(","):
+      if hdfs_site.get('dfs.ha.namenodes.'+ns):
+        return True
   return False
 
+
 def get_active_namenode(hdfs_site, security_enabled, run_user):
   """
   return format is nn_unique_id and it's address ('nn1', 
'hdfs://hostname1:port1')
@@ -147,22 +161,26 @@ def get_active_namenode(hdfs_site, security_enabled, 
run_user):
     return active_namenodes[0]
 
   raise Fail('No active NameNode was found.')
-  
+
 def get_property_for_active_namenode(hdfs_site, property_name, 
security_enabled, run_user):
   """
   For dfs.namenode.rpc-address:
     - In non-ha mode it will return hdfs_site[dfs.namenode.rpc-address]
     - In ha-mode it will return hdfs_site[dfs.namenode.rpc-address.nnha.nn2], 
where nnha is the name of HA, and nn2 is id of active NN
+    - In federated mode it fails since there is more than one active namenode
   """
   value = None
   rpc_key = None
-  if is_ha_enabled(hdfs_site):
-    name_service = get_nameservice(hdfs_site)
+  if _is_ha_config(hdfs_site):
+    name_services = get_nameservices(hdfs_site)
+    if len(name_services) > 1:
+      raise Fail('Multiple name services not supported by this function')
+    name_service = name_services(hdfs_site)[0]
     active_namenodes = get_namenode_states(hdfs_site, security_enabled, 
run_user)[0]
-    
+
     if not len(active_namenodes):
       raise Fail("There is no active namenodes.")
-    
+
     active_namenode_id = active_namenodes[0][0]
     value = 
hdfs_site[format("{property_name}.{name_service}.{active_namenode_id}")]
     rpc_key = NAMENODE_RPC_FRAGMENT.format(name_service,active_namenode_id)
@@ -182,15 +200,24 @@ def get_all_namenode_addresses(hdfs_site):
   """
   - In non-ha mode it will return list of 
hdfs_site[dfs.namenode.http[s]-address]
   - In ha-mode it will return list of 
hdfs_site[dfs.namenode.http-address.NS.Uid], where NS is the name of HA, and 
Uid is id of NameNode
+  - In federated mode it will return all namenodes for internal name services
   """
   nn_addresses = []
+  name_services = get_nameservices(hdfs_site)
+  if not name_services:
+    name_services = [None] #fall back to config handling without name services
+  for ns in name_services:
+    nn_addresses += _get_all_namenode_addresses_single_ns(hdfs_site, ns)
+  return nn_addresses
+
+def _get_all_namenode_addresses_single_ns(hdfs_site, name_service):
+  nn_addresses = []
   http_policy = 'HTTP_ONLY'
 
   if DFS_HTTP_POLICY in hdfs_site:
     http_policy = hdfs_site[DFS_HTTP_POLICY]
 
-  if is_ha_enabled(hdfs_site):
-    name_service = get_nameservice(hdfs_site)
+  if _is_ha_config(hdfs_site):
     nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service
     nn_unique_ids = hdfs_site[nn_unique_ids_key].split(',')
     for nn_unique_id in nn_unique_ids:
@@ -220,32 +247,36 @@ def get_all_namenode_addresses(hdfs_site):
 
   return nn_addresses
 
-def get_nameservice(hdfs_site):
+def get_nameservices(hdfs_site):
   """
   Multiple nameservices can be configured for example to support seamless 
distcp
   between two HA clusters. The nameservices are defined as a comma separated
   list in hdfs_site['dfs.nameservices']. The parameter
   hdfs['dfs.internal.nameservices'] was introduced in Hadoop 2.6 to denote the
   nameservice for the current cluster (HDFS-6376).
+  In federated mode multiple name services will be returned.
 
   This method uses hdfs['dfs.internal.nameservices'] to get the current
-  nameservice, if that parameter is not available it tries to splits the value
+  nameservice(s), if that parameter is not available it tries to splits the 
value
   in hdfs_site['dfs.nameservices'] returning the first string or what is
   contained in hdfs_site['dfs.namenode.shared.edits.dir'].
 
   By default hdfs_site['dfs.nameservices'] is returned.
   :param hdfs_site:
-  :return: string or empty
+  :return: list of string or an empty list
   """
-  name_service = hdfs_site.get('dfs.internal.nameservices', None)
-  if not name_service:
-    name_service = hdfs_site.get('dfs.nameservices', None)
 
-  if name_service and ',' in name_service:
+  name_services_param = hdfs_site.get('dfs.internal.nameservices', None) #in 
Federated mode this can be a list
+  if name_services_param:
+    name_services = name_services_param.split(",")
+    return name_services
+
+  name_services_string = hdfs_site.get('dfs.nameservices', None)
+
+  if name_services_string and ',' in name_services_string:
     import re
-    for ns in name_service.split(","):
+    for ns in name_services_string.split(","):
       if 'dfs.namenode.shared.edits.dir' in hdfs_site and re.match(r'.*%s$' % 
ns, hdfs_site['dfs.namenode.shared.edits.dir']): # better would be 
core_site['fs.defaultFS'] but it's not available
-        return ns
-    return name_service.split(",")[0] # default to return the first nameservice
-
-  return name_service
+        return [ns]
+    return [name_services_string.split(",")[0]] # default to return the first 
nameservice
+  return []

-- 
To stop receiving notification emails like this one, please contact
swa...@apache.org.

Reply via email to