This is an automated email from the ASF dual-hosted git repository. swagle pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 6d2c6bab0ba57c362bc06871f866b1d6a914e5eb Author: majorendre <majorendre> AuthorDate: Thu Feb 8 11:10:29 2018 +0100 Federation support added to name_node_ha_utils.py So far the code could handle only a single name service. Added support for multiple name services. --- .../resource_management/TestNamenodeHaUtils.py | 15 ++- .../libraries/functions/namenode_ha_utils.py | 109 +++++++++++++-------- 2 files changed, 81 insertions(+), 43 deletions(-) diff --git a/ambari-agent/src/test/python/resource_management/TestNamenodeHaUtils.py b/ambari-agent/src/test/python/resource_management/TestNamenodeHaUtils.py index 2fc4904..6f2e5aa 100644 --- a/ambari-agent/src/test/python/resource_management/TestNamenodeHaUtils.py +++ b/ambari-agent/src/test/python/resource_management/TestNamenodeHaUtils.py @@ -19,7 +19,7 @@ limitations under the License. ''' from unittest import TestCase from resource_management.libraries.functions.namenode_ha_utils import \ - get_nameservice + get_nameservices class TestNamenodeHaUtils(TestCase): @@ -39,7 +39,7 @@ class TestNamenodeHaUtils(TestCase): "dfs.namenode.rpc-address.HAB.nn2": "hostb2:8020", } - self.assertEqual("HAA", get_nameservice(hdfs_site)) + self.assertEqual(["HAA"], get_nameservices(hdfs_site)) # dfs.internal.nameservices not in hdfs-site hdfs_site = { @@ -52,9 +52,16 @@ class TestNamenodeHaUtils(TestCase): "dfs.namenode.rpc-address.HAB.nn2": "hostb2:8020", } - self.assertEqual("HAA", get_nameservice(hdfs_site)) + self.assertEqual(["HAA"], get_nameservices(hdfs_site)) # Non HA hdfs_site = {} - self.assertEqual(None, get_nameservice(hdfs_site)) + self.assertEqual([], get_nameservices(hdfs_site)) + + # federated config dfs.internal.nameservices in hdfs-site + hdfs_site = { + "dfs.internal.nameservices": "ns1,ns2", + } + + self.assertEqual(["ns1","ns2"], get_nameservices(hdfs_site)) diff --git a/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py b/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py index 7a2635f..d3a9c2d 100644 --- a/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py +++ b/ambari-common/src/main/python/resource_management/libraries/functions/namenode_ha_utils.py @@ -28,7 +28,7 @@ from resource_management.libraries.functions.hdfs_utils import is_https_enabled_ __all__ = ["get_namenode_states", "get_active_namenode", - "get_property_for_active_namenode", "get_nameservice"] + "get_property_for_active_namenode", "get_nameservices"] HDFS_NN_STATE_ACTIVE = 'active' HDFS_NN_STATE_STANDBY = 'standby' @@ -65,15 +65,32 @@ def get_namenode_states(hdfs_site, security_enabled, run_user, times=10, sleep_t doRetries.attempt = 0 return doRetries(hdfs_site, security_enabled, run_user) + def get_namenode_states_noretries(hdfs_site, security_enabled, run_user, last_retry=True): """ + returns data for all name nodes of all name services + """ + active_namenodes = [] + standby_namenodes = [] + unknown_namenodes = [] + + name_services = get_nameservices(hdfs_site) + for name_service in name_services: + active, standby, unknown = _get_namenode_states_noretries_single_ns(hdfs_site, name_service, security_enabled, run_user, last_retry) + active_namenodes += active + standby_namenodes += standby + unknown_namenodes += unknown + return active_namenodes, standby_namenodes, unknown_namenodes + + +def _get_namenode_states_noretries_single_ns(hdfs_site, name_service, security_enabled, run_user, last_retry=True): + """ return format [('nn1', 'hdfs://hostname1:port1'), ('nn2', 'hdfs://hostname2:port2')] , [....], [....] """ active_namenodes = [] standby_namenodes = [] unknown_namenodes = [] - - name_service = get_nameservice(hdfs_site) + nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service # now we have something like 'nn1,nn2,nn3,nn4' @@ -90,7 +107,7 @@ def get_namenode_states_noretries(hdfs_site, security_enabled, run_user, last_re else: key = NAMENODE_HTTPS_FRAGMENT.format(name_service,nn_unique_id) protocol = "https" - + if key in hdfs_site: # use str() to ensure that unicode strings do not have the u' in them value = str(hdfs_site[key]) @@ -101,11 +118,11 @@ def get_namenode_states_noretries(hdfs_site, security_enabled, run_user, last_re value = value.replace(INADDR_ANY, rpc_host) jmx_uri = JMX_URI_FRAGMENT.format(protocol, value) - + state = get_value_from_jmx(jmx_uri, 'tag.HAState', security_enabled, run_user, is_https_enabled, last_retry) # If JMX parsing failed if not state: - check_service_cmd = "hdfs haadmin -ns {0} -getServiceState {1}".format(get_nameservice(hdfs_site), nn_unique_id) + check_service_cmd = "hdfs haadmin -ns {0} -getServiceState {1}".format(name_service, nn_unique_id) code, out = shell.call(check_service_cmd, logoutput=True, user=run_user) if code == 0 and out: if HDFS_NN_STATE_STANDBY in out: @@ -122,22 +139,19 @@ def get_namenode_states_noretries(hdfs_site, security_enabled, run_user, last_re return active_namenodes, standby_namenodes, unknown_namenodes -def is_ha_enabled(hdfs_site): - dfs_ha_nameservices = get_nameservice(hdfs_site) - - if not dfs_ha_nameservices or is_empty(dfs_ha_nameservices): - return False - - dfs_ha_namenode_ids = hdfs_site[format("dfs.ha.namenodes.{dfs_ha_nameservices}")] - - if not is_empty(dfs_ha_namenode_ids): - dfs_ha_namemodes_ids_list = dfs_ha_namenode_ids.split(",") - dfs_ha_namenode_ids_array_len = len(dfs_ha_namemodes_ids_list) - if dfs_ha_namenode_ids_array_len > 1: - return True - + +def _is_ha_config(hdfs_site): + """ + returns True if an HA config is used + """ + name_services = hdfs_site.get('dfs.nameservices', None) + if name_services: + for ns in name_services.split(","): + if hdfs_site.get('dfs.ha.namenodes.'+ns): + return True return False + def get_active_namenode(hdfs_site, security_enabled, run_user): """ return format is nn_unique_id and it's address ('nn1', 'hdfs://hostname1:port1') @@ -147,22 +161,26 @@ def get_active_namenode(hdfs_site, security_enabled, run_user): return active_namenodes[0] raise Fail('No active NameNode was found.') - + def get_property_for_active_namenode(hdfs_site, property_name, security_enabled, run_user): """ For dfs.namenode.rpc-address: - In non-ha mode it will return hdfs_site[dfs.namenode.rpc-address] - In ha-mode it will return hdfs_site[dfs.namenode.rpc-address.nnha.nn2], where nnha is the name of HA, and nn2 is id of active NN + - In federated mode it fails since there is more than one active namenode """ value = None rpc_key = None - if is_ha_enabled(hdfs_site): - name_service = get_nameservice(hdfs_site) + if _is_ha_config(hdfs_site): + name_services = get_nameservices(hdfs_site) + if len(name_services) > 1: + raise Fail('Multiple name services not supported by this function') + name_service = name_services(hdfs_site)[0] active_namenodes = get_namenode_states(hdfs_site, security_enabled, run_user)[0] - + if not len(active_namenodes): raise Fail("There is no active namenodes.") - + active_namenode_id = active_namenodes[0][0] value = hdfs_site[format("{property_name}.{name_service}.{active_namenode_id}")] rpc_key = NAMENODE_RPC_FRAGMENT.format(name_service,active_namenode_id) @@ -182,15 +200,24 @@ def get_all_namenode_addresses(hdfs_site): """ - In non-ha mode it will return list of hdfs_site[dfs.namenode.http[s]-address] - In ha-mode it will return list of hdfs_site[dfs.namenode.http-address.NS.Uid], where NS is the name of HA, and Uid is id of NameNode + - In federated mode it will return all namenodes for internal name services """ nn_addresses = [] + name_services = get_nameservices(hdfs_site) + if not name_services: + name_services = [None] #fall back to config handling without name services + for ns in name_services: + nn_addresses += _get_all_namenode_addresses_single_ns(hdfs_site, ns) + return nn_addresses + +def _get_all_namenode_addresses_single_ns(hdfs_site, name_service): + nn_addresses = [] http_policy = 'HTTP_ONLY' if DFS_HTTP_POLICY in hdfs_site: http_policy = hdfs_site[DFS_HTTP_POLICY] - if is_ha_enabled(hdfs_site): - name_service = get_nameservice(hdfs_site) + if _is_ha_config(hdfs_site): nn_unique_ids_key = 'dfs.ha.namenodes.' + name_service nn_unique_ids = hdfs_site[nn_unique_ids_key].split(',') for nn_unique_id in nn_unique_ids: @@ -220,32 +247,36 @@ def get_all_namenode_addresses(hdfs_site): return nn_addresses -def get_nameservice(hdfs_site): +def get_nameservices(hdfs_site): """ Multiple nameservices can be configured for example to support seamless distcp between two HA clusters. The nameservices are defined as a comma separated list in hdfs_site['dfs.nameservices']. The parameter hdfs['dfs.internal.nameservices'] was introduced in Hadoop 2.6 to denote the nameservice for the current cluster (HDFS-6376). + In federated mode multiple name services will be returned. This method uses hdfs['dfs.internal.nameservices'] to get the current - nameservice, if that parameter is not available it tries to splits the value + nameservice(s), if that parameter is not available it tries to splits the value in hdfs_site['dfs.nameservices'] returning the first string or what is contained in hdfs_site['dfs.namenode.shared.edits.dir']. By default hdfs_site['dfs.nameservices'] is returned. :param hdfs_site: - :return: string or empty + :return: list of string or an empty list """ - name_service = hdfs_site.get('dfs.internal.nameservices', None) - if not name_service: - name_service = hdfs_site.get('dfs.nameservices', None) - if name_service and ',' in name_service: + name_services_param = hdfs_site.get('dfs.internal.nameservices', None) #in Federated mode this can be a list + if name_services_param: + name_services = name_services_param.split(",") + return name_services + + name_services_string = hdfs_site.get('dfs.nameservices', None) + + if name_services_string and ',' in name_services_string: import re - for ns in name_service.split(","): + for ns in name_services_string.split(","): if 'dfs.namenode.shared.edits.dir' in hdfs_site and re.match(r'.*%s$' % ns, hdfs_site['dfs.namenode.shared.edits.dir']): # better would be core_site['fs.defaultFS'] but it's not available - return ns - return name_service.split(",")[0] # default to return the first nameservice - - return name_service + return [ns] + return [name_services_string.split(",")[0]] # default to return the first nameservice + return [] -- To stop receiving notification emails like this one, please contact swa...@apache.org.