Repository: ambari Updated Branches: refs/heads/trunk e019c6c15 -> c00908495
AMBARI-13396: RU: Handle Namenode being down scenarios (jluniya) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/c0090849 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/c0090849 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/c0090849 Branch: refs/heads/trunk Commit: c00908495953e7c725bd49b7a124883d12621324 Parents: e019c6c Author: Jayush Luniya <[email protected]> Authored: Wed Oct 14 15:43:34 2015 -0700 Committer: Jayush Luniya <[email protected]> Committed: Wed Oct 14 15:43:34 2015 -0700 ---------------------------------------------------------------------- .../2.1.0.2.0/package/scripts/params_linux.py | 2 + .../HDFS/2.1.0.2.0/package/scripts/utils.py | 117 +++++++++---------- 2 files changed, 58 insertions(+), 61 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/c0090849/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py index 563c234..38eac2e 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/params_linux.py @@ -55,6 +55,8 @@ security_enabled = config['configurations']['cluster-env']['security_enabled'] hdfs_user = status_params.hdfs_user root_user = "root" hadoop_pid_dir_prefix = status_params.hadoop_pid_dir_prefix +namenode_pid_file = status_params.namenode_pid_file +zkfc_pid_file = status_params.zkfc_pid_file # Some datanode settings dfs_dn_addr = default('/configurations/hdfs-site/dfs.datanode.address', None) http://git-wip-us.apache.org/repos/asf/ambari/blob/c0090849/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py index 7bc0b6a..d1e764a 100644 --- a/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py +++ b/ambari-server/src/main/resources/common-services/HDFS/2.1.0.2.0/package/scripts/utils.py @@ -31,6 +31,7 @@ from resource_management.core.exceptions import ComponentIsNotRunning from resource_management.core.logger import Logger from resource_management.libraries.functions.curl_krb_request import curl_krb_request from resource_management.core.exceptions import Fail +from resource_management.libraries.functions.namenode_ha_utils import get_namenode_states from zkfc_slave import ZkfcSlave @@ -74,50 +75,59 @@ def initiate_safe_zkfc_failover(): Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"), user = params.hdfs_user) - check_service_cmd = format("hdfs haadmin -getServiceState {namenode_id}") - code, out = shell.call(check_service_cmd, logoutput=True, user=params.hdfs_user) - - original_state = "unknown" - if code == 0 and out: - original_state = "active" if "active" in out else ("standby" if "standby" in out else original_state) - Logger.info("Namenode service state: %s" % original_state) - - if original_state == "active": - msg = "Rolling Upgrade - Initiating a ZKFC failover on {0} NameNode host {1}.".format(original_state, params.hostname) - Logger.info(msg) - - check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby") - failover_command = format("hdfs haadmin -failover {namenode_id} {other_namenode_id}") - - code, out = shell.call(failover_command, user=params.hdfs_user, logoutput=True) - Logger.info(format("Rolling Upgrade - failover command returned {code}")) - wait_for_standby = False - - if code == 0: - wait_for_standby = True + active_namenode_id = None + standby_namenode_id = None + active_namenodes, standby_namenodes, unknown_namenodes = get_namenode_states(params.hdfs_site, params.security_enabled, params.hdfs_user) + if active_namenodes: + active_namenode_id = active_namenodes[0][0] + if standby_namenodes: + standby_namenode_id = standby_namenodes[0][0] + + if active_namenode_id: + Logger.info(format("Active NameNode id: {active_namenode_id}")) + if standby_namenode_id: + Logger.info(format("Standby NameNode id: {standby_namenode_id}")) + if unknown_namenodes: + for unknown_namenode in unknown_namenodes: + Logger.info("NameNode HA state for {0} is unknown".format(unknown_namenode[0])) + + if params.namenode_id == active_namenode_id and params.other_namenode_id == standby_namenode_id: + # Failover if this NameNode is active and other NameNode is up and in standby (i.e. ready to become active on failover) + Logger.info(format("NameNode {namenode_id} is active and NameNode {other_namenode_id} is in standby")) + + failover_command = format("hdfs haadmin -failover {namenode_id} {other_namenode_id}") + check_standby_cmd = format("hdfs haadmin -getServiceState {namenode_id} | grep standby") + + msg = "Rolling Upgrade - Initiating a ZKFC failover on active NameNode host {0}.".format(params.hostname) + Logger.info(msg) + code, out = shell.call(failover_command, user=params.hdfs_user, logoutput=True) + Logger.info(format("Rolling Upgrade - failover command returned {code}")) + wait_for_standby = False + + if code == 0: + wait_for_standby = True + else: + # Try to kill ZKFC manually + was_zkfc_killed = kill_zkfc(params.hdfs_user) + code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True) + Logger.info(format("Rolling Upgrade - check for standby returned {code}")) + if code == 255 and out: + Logger.info("Rolling Upgrade - NameNode is already down.") else: - # Try to kill ZKFC manually - was_zkfc_killed = kill_zkfc(params.hdfs_user) - code, out = shell.call(check_standby_cmd, user=params.hdfs_user, logoutput=True) - Logger.info(format("Rolling Upgrade - check for standby returned {code}")) - - if code == 255 and out: - Logger.info("Rolling Upgrade - namenode is already down.") - else: - if was_zkfc_killed: - # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover. - wait_for_standby = True - - if wait_for_standby: - Logger.info("Waiting for this NameNode to become the standby one.") - Execute(check_standby_cmd, - user=params.hdfs_user, - tries=50, - try_sleep=6, - logoutput=True) + if was_zkfc_killed: + # Only mandate that this be the standby namenode if ZKFC was indeed killed to initiate a failover. + wait_for_standby = True + + if wait_for_standby: + Logger.info("Waiting for this NameNode to become the standby one.") + Execute(check_standby_cmd, + user=params.hdfs_user, + tries=50, + try_sleep=6, + logoutput=True) else: - raise Fail("Unable to determine NameNode HA states by calling command: {0}".format(check_service_cmd)) - + msg = "Rolling Upgrade - Skipping ZKFC failover on NameNode host {0}.".format(params.hostname) + Logger.info(msg) def kill_zkfc(zkfc_user): """ @@ -129,36 +139,21 @@ def kill_zkfc(zkfc_user): """ import params if params.dfs_ha_enabled: - zkfc_pid_file = get_service_pid_file("zkfc", zkfc_user) - if zkfc_pid_file: + if params.zkfc_pid_file: check_process = as_user(format("ls {zkfc_pid_file} > /dev/null 2>&1 && ps -p `cat {zkfc_pid_file}` > /dev/null 2>&1"), user=zkfc_user) code, out = shell.call(check_process) if code == 0: Logger.debug("ZKFC is running and will be killed.") kill_command = format("kill -15 `cat {zkfc_pid_file}`") Execute(kill_command, - user=zkfc_user + user=zkfc_user ) - File(zkfc_pid_file, + File(params.zkfc_pid_file, action = "delete", - ) + ) return True return False - -def get_service_pid_file(name, user): - """ - Get the pid file path that was used to start the service by the user. - :param name: Service name - :param user: User that started the service. - :return: PID file path - """ - import params - pid_dir = format("{hadoop_pid_dir_prefix}/{user}") - pid_file = format("{pid_dir}/hadoop-{user}-{name}.pid") - return pid_file - - def service(action=None, name=None, user=None, options="", create_pid_dir=False, create_log_dir=False): """
